Sending and Receiving Normal Messages
Sending normal messages
Normal messages support both synchronous and asynchronous sending. Users can choose as per the use case.
Synchronous sending
Synchronous sending refers to the client sending a message to the server and waiting for the server to respond before sending the next message.
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;
public class Producer {
// Instance access using public and private keys, available on the instance token management page
private static final String ACCESS_KEY = "xxx";
private static final String SECRET_KEY = "xxx";
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
}
public static void main(String[] args) throws MQClientException, InterruptedException {
// "ProducerGroupName" is the production group, users can use the production Group created in the console or customize
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
// Instance access address, can be obtained on the instance list page
producer.setNamesrvAddr("1.1.1.1:9876");
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("Topic_Name",
"Message_Tag",
"Message_Key",
"Message Content Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
Asynchronous sending
Asynchronous sending refers to the client sending a message and then sending the next message without waiting for the server to return.
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;
public class Producer {
// Instance access using public and private keys, available on the instance token management page
private static final String ACCESS_KEY = "xxx";
private static final String SECRET_KEY = "xxx";
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
}
public static void main(String[] args) throws MQClientException, InterruptedException {
// "ProducerGroupName" is the production group, users can use the production Group created in the console or customize
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
// Instance access address, can be obtained on the instance list page
producer.setNamesrvAddr("1.1.1.1:9876");
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("Topic_Name",
"Message_Tag",
"Message_Key",
"Message Content Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult result) {
// Send successfully
System.out.println("send message success");
}
@Override
public void onException(Throwable throwable) {
// Send failed, can be sent again or store this message for compensation
System.out.println("send message failed.");
throwable.printStackTrace();
}
});
}
} catch (Exception e) {
e.printStackTrace();
}
try
{
// Delay 2 seconds to stop the producer, needed for testing, can be removed in actual use
Thread.sleep(2000);
producer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Subscribing to normal messages
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;
public class PushConsumer {
// Instance access using public and private keys, available on the instance token management page
private static final String ACCESS_KEY = "xxx";
private static final String SECRET_KEY = "xxx";
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
}
public static void main(String[] args) throws InterruptedException, MQClientException {
// "GROUP_NAME" is the consumer group, can be obtained on the instance Group management page
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_NAME", getAclRPCHook(), new AllocateMessageQueueAveragely());
// Instance access address, can be obtained on the instance list page
consumer.setNamesrvAddr("1.1.1.1:8100");
consumer.subscribe("Topic_Name", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}