Skip to Content
Access GuideReceive and Send Ordinary Messages

Sending and Receiving Normal Messages

Sending normal messages

Normal messages support both synchronous and asynchronous sending. Users can choose as per the use case.

Synchronous sending

Synchronous sending refers to the client sending a message to the server and waiting for the server to respond before sending the next message.

import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.remoting.RPCHook; public class Producer { // Instance access using public and private keys, available on the instance token management page private static final String ACCESS_KEY = "xxx"; private static final String SECRET_KEY = "xxx"; static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY)); } public static void main(String[] args) throws MQClientException, InterruptedException { // "ProducerGroupName" is the production group, users can use the production Group created in the console or customize DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook()); // Instance access address, can be obtained on the instance list page producer.setNamesrvAddr("1.1.1.1:9876"); producer.start(); for (int i = 0; i < 128; i++) try { { Message msg = new Message("Topic_Name", "Message_Tag", "Message_Key", "Message Content Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }

Asynchronous sending

Asynchronous sending refers to the client sending a message and then sending the next message without waiting for the server to return.

import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.remoting.RPCHook; public class Producer { // Instance access using public and private keys, available on the instance token management page private static final String ACCESS_KEY = "xxx"; private static final String SECRET_KEY = "xxx"; static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY)); } public static void main(String[] args) throws MQClientException, InterruptedException { // "ProducerGroupName" is the production group, users can use the production Group created in the console or customize DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook()); // Instance access address, can be obtained on the instance list page producer.setNamesrvAddr("1.1.1.1:9876"); producer.start(); for (int i = 0; i < 128; i++) try { { Message msg = new Message("Topic_Name", "Message_Tag", "Message_Key", "Message Content Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult result) { // Send successfully System.out.println("send message success"); } @Override public void onException(Throwable throwable) { // Send failed, can be sent again or store this message for compensation System.out.println("send message failed."); throwable.printStackTrace(); } }); } } catch (Exception e) { e.printStackTrace(); } try { // Delay 2 seconds to stop the producer, needed for testing, can be removed in actual use Thread.sleep(2000); producer.shutdown(); } catch (Exception e) { e.printStackTrace(); } } }

Subscribing to normal messages

import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.remoting.RPCHook; public class PushConsumer { // Instance access using public and private keys, available on the instance token management page private static final String ACCESS_KEY = "xxx"; private static final String SECRET_KEY = "xxx"; static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY)); } public static void main(String[] args) throws InterruptedException, MQClientException { // "GROUP_NAME" is the consumer group, can be obtained on the instance Group management page DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_NAME", getAclRPCHook(), new AllocateMessageQueueAveragely()); // Instance access address, can be obtained on the instance list page consumer.setNamesrvAddr("1.1.1.1:8100"); consumer.subscribe("Topic_Name", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }