Sending normal messages

Normal messages support both synchronous and asynchronous sending. Users can choose as per the use case.

Synchronous sending

Synchronous sending refers to the client sending a message to the server and waiting for the server to respond before sending the next message.

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;
public class Producer {
    // Instance access using public and private keys, available on the instance token management page
    private static final String ACCESS_KEY = "xxx";
    private static final String SECRET_KEY = "xxx";
    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // "ProducerGroupName" is the production group, users can use the production Group created in the console or customize
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
        // Instance access address, can be obtained on the instance list page
        for (int i = 0; i < 128; i++)
            try {
                    Message msg = new Message("Topic_Name",
                        "Message Content Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
            } catch (Exception e) {

Asynchronous sending

Asynchronous sending refers to the client sending a message and then sending the next message without waiting for the server to return.

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;
public class Producer {
    // Instance access using public and private keys, available on the instance token management page
    private static final String ACCESS_KEY = "xxx";
    private static final String SECRET_KEY = "xxx";
    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // "ProducerGroupName" is the production group, users can use the production Group created in the console or customize
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
        // Instance access address, can be obtained on the instance list page
        for (int i = 0; i < 128; i++)
            try {
                    Message msg = new Message("Topic_Name",
                        "Message Content Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    producer.send(msg, new SendCallback() {
                        public void onSuccess(SendResult result) {
                            // Send successfully
                            System.out.println("send message success");
                        public void onException(Throwable throwable) {
                            // Send failed, can be sent again or store this message for compensation
                            System.out.println("send message failed.");
            } catch (Exception e) {
            // Delay 2 seconds to stop the producer, needed for testing, can be removed in actual use
        } catch (Exception e) {

Subscribing to normal messages

import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;
public class PushConsumer {
    // Instance access using public and private keys, available on the instance token management page
    private static final String ACCESS_KEY = "xxx";
    private static final String SECRET_KEY = "xxx";
    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
    public static void main(String[] args) throws InterruptedException, MQClientException {
        // "GROUP_NAME" is the consumer group, can be obtained on the instance Group management page
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_NAME", getAclRPCHook(), new AllocateMessageQueueAveragely());
        // Instance access address, can be obtained on the instance list page
        consumer.subscribe("Topic_Name", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        System.out.printf("Consumer Started.%n");