Docs
rocketmq
Access Guide
Receive and Send Ordinary Messages

Sending and Receiving Normal Messages

Sending normal messages

Normal messages support both synchronous and asynchronous sending. Users can choose as per the use case.

Synchronous sending

Synchronous sending refers to the client sending a message to the server and waiting for the server to respond before sending the next message.

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;
 
public class Producer {
    // Instance access using public and private keys, available on the instance token management page
    private static final String ACCESS_KEY = "xxx";
    private static final String SECRET_KEY = "xxx";
 
    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
    }
    public static void main(String[] args) throws MQClientException, InterruptedException {
 
        // "ProducerGroupName" is the production group, users can use the production Group created in the console or customize
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
        // Instance access address, can be obtained on the instance list page
        producer.setNamesrvAddr("1.1.1.1:9876");
        producer.start();
 
        for (int i = 0; i < 128; i++)
            try {
                {
                    Message msg = new Message("Topic_Name",
                        "Message_Tag",
                        "Message_Key",
                        "Message Content Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }
 
            } catch (Exception e) {
                e.printStackTrace();
            }
 
        producer.shutdown();
    }
}

Asynchronous sending

Asynchronous sending refers to the client sending a message and then sending the next message without waiting for the server to return.

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;
 
public class Producer {
    // Instance access using public and private keys, available on the instance token management page
    private static final String ACCESS_KEY = "xxx";
    private static final String SECRET_KEY = "xxx";
 
    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
    }
    public static void main(String[] args) throws MQClientException, InterruptedException {
 
        // "ProducerGroupName" is the production group, users can use the production Group created in the console or customize
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
        // Instance access address, can be obtained on the instance list page
        producer.setNamesrvAddr("1.1.1.1:9876");
        producer.start();
 
        for (int i = 0; i < 128; i++)
            try {
                {
                    Message msg = new Message("Topic_Name",
                        "Message_Tag",
                        "Message_Key",
                        "Message Content Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    producer.send(msg, new SendCallback() {
                        @Override
                        public void onSuccess(SendResult result) {
                            // Send successfully
                            System.out.println("send message success");
                        }
 
                        @Override
                        public void onException(Throwable throwable) {
                            // Send failed, can be sent again or store this message for compensation
                            System.out.println("send message failed.");
                            throwable.printStackTrace();
                        }
                });
                }
 
            } catch (Exception e) {
                e.printStackTrace();
            }
 
        try
        {
            // Delay 2 seconds to stop the producer, needed for testing, can be removed in actual use
            Thread.sleep(2000);
            producer.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Subscribing to normal messages

import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;
 
public class PushConsumer {
    // Instance access using public and private keys, available on the instance token management page
    private static final String ACCESS_KEY = "xxx";
    private static final String SECRET_KEY = "xxx";
 
    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
    }
 
    public static void main(String[] args) throws InterruptedException, MQClientException {
        // "GROUP_NAME" is the consumer group, can be obtained on the instance Group management page
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_NAME", getAclRPCHook(), new AllocateMessageQueueAveragely());
        // Instance access address, can be obtained on the instance list page
        consumer.setNamesrvAddr("1.1.1.1:8100");
        consumer.subscribe("Topic_Name", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}