使用 RabbitMQ 发送对象

时间:2023-02-20
本文介绍了使用 RabbitMQ 发送对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

我知道这个问题与问题重复使用rabbitmq发送消息不是字符串而是结构

I Understand that this question duplicates question at using rabbitmq to send a message not string but struct

如果使用第一种方法来做到这一点

if to do this using the first way

第一种方式

我有以下痕迹:

java.io.EOFException
at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2304)
at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2773)
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:798)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:298)
at com.mdnaRabbit.worker.data.Data.fromBytes(Data.java:78)
at com.mdnaRabbit.worker.App.main(App.java:41)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)

我已经检查并确认该消息在发送者类中绝对可以很好地转换为字节,但消费者无法接收它.

I've checked and shure that message is transformd to bytes absolutely well in sender class, but the consumer can't receive it.

这是我的制作人课程:

package com.mdnaRabbit.newt;

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import org.apache.commons.lang.SerializationUtils;
import com.mdnaRabbit.worker.data.Data;

public class App {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main( String[] argv) throws IOException{

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        int i = 0;

        do {
            Data message = getMessage();
            byte [] byteMessage = message.getBytes();
            //System.out.println(byteMessage);
            channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, byteMessage);
            System.out.println(" [" + (i+1) + "] message Sent" + Data.fromBytes(byteMessage).getBody());
            i++;
        } while (i<15);

        channel.close();
        connection.close();
    }

    private static Data getMessage(){
        Data data = new Data();
        data.setHeader("header");
        data.setDomainId("abc.com");
        data.setReceiver("me");
        data.setSender("he");
        data.setBody("body");
        return data;
    }

    private static String joinStrings(String[] strings, String delimiter){
        int length = strings.length;
        if (length == 0) return "";
        StringBuilder words = new StringBuilder(strings[0]);
        for (int i = 1; i < length; i++){
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}

这是我的消费类:

    package com.mdnaRabbit.worker;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.mdnaRabbit.worker.data.Data;
import org.apache.commons.lang.SerializationUtils;

public class App {

    private static final String TASK_QUEUE_NAME = "task_queue";
    private static int i = 0;
    public static void main( String[] argv )
            throws IOException,
            InterruptedException{

        ExecutorService threader = Executors.newFixedThreadPool(20);
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection(threader);
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        channel.basicQos(20);

        final QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

        try {

            while (true) {

                        try {QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                            Data message = Data.fromBytes(delivery.getBody());
                            //Data message = (Data) SerializationUtils.deserialize(delivery.getBody());

                            System.out.println(" [" + (i++) +"] Received" + message.getBody());

                            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                        }catch (Exception e){
                        }
                    }
        } catch (Exception e){
            e.printStackTrace();
        }
        channel.close();
        connection.close();
    }

}

这是我的数据类:

package com.mdnaRabbit.worker.data;

import java.io.*;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Data implements Serializable{

    public String header;
    public String body;
    public String domainId;
    public String sender;
    public String receiver;

    public void setHeader(String head){
        this.header = head;
    }

    public String getHeader(){
        return header;
    }

    public void setBody(String body){
        this.body = body;
    }

    public String getBody(){
        return body;
    }

    public void setDomainId(String domainId){
        this.domainId = domainId;
    }

    public String getDomainId(){
        return domainId;
    }

    public void setSender(String sender){
        this.sender = sender;
    }

    public String getSender(){
        return sender;
    }

    public String getReceiver(){
        return receiver;
    }

    public void setReceiver(String receiver){
        this.receiver = receiver;
    }


    public byte[] getBytes() {
        byte[]bytes;
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try{
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            oos.writeObject(this);
            oos.flush();
            oos.reset();
            bytes = baos.toByteArray();
            oos.close();
            baos.close();
        } catch(IOException e){
            bytes = new byte[] {};
            Logger.getLogger("bsdlog").log(Level.ALL, "unable to write to output stream" + e);
        }
        return bytes;
    }

    public static Data fromBytes(byte[] body) {
        Data obj = null;
        try {
            ByteArrayInputStream bis = new ByteArrayInputStream(body);
            ObjectInputStream ois = new ObjectInputStream(bis);
            obj = (Data) ois.readObject();
            ois.close();
            bis.close();
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        catch (ClassNotFoundException ex) {
            ex.printStackTrace();
        }
        return obj;
    }
}

我似乎总是消费者收到消息,因为当我不尝试将其转换为对象时,只是写System.out.println(delivery.getBody)它显示字节

I always seems that consumer receives messages, because when I'm not trying to transform it into the object and just write System.out.println(delivery.getBody) it shows bytes

推荐答案

你收到的字节数组好像是空的.这是因为:

It looks like the byte array you receive is empty. This happens because of this:

    } catch(IOException e){
        bytes = new byte[] {};
    }

产生异常时,代码不会警告您某些内容已损坏,而是发送一个空数组.您至少应该记录错误.

When an exception is produced, the code doesn't warn you that something is broken and just sends an empty array instead. You should at least log the error.

产生异常可能是因为您试图序列化一个不可序列化的类.要使类可序列化,您必须在其声明中添加implements Serializable":

The exception is being produced probably because you are trying to serialize a class that is not serializable. To make a class serializable you have to add "implements Serializable" to its declaration:

public class Data implements Serializable {

这篇关于使用 RabbitMQ 发送对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

上一篇:Play Framework:为 Spring RabbitMQ 监听器手动打开 JPA 上下文 下一篇:如何使用 Java 在 RabbitMQ 中实现 Headers Exchange?

相关文章

最新文章