Message Tracing

Business operations using message tracing require enabling message tracing on both the production side and the consumption side.

Enable on Production Side

import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.remoting.RPCHook; public class Producer { // Instance access uses public and private keys, which can be obtained from the instance token management page private static final String ACCESS_KEY = "xxx"; private static final String SECRET_KEY = "xxx"; static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY)); } public static void main(String[] args) throws MQClientException, InterruptedException { // "ProducerGroupName" is the producer group, users can use the producer group created in the console or customize it, the final argument is set to "" DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook(), true, ""); // Instance access address, can obtain from instance list page producer.setNamesrvAddr(""); producer.start(); for(int i = 0; i < 128; i++) try { { Message msg = new Message("Topic_Name", "Message_Tag", "Message_Key", "Message Content Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }

Enable on Consumption Side

import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.remoting.RPCHook; public class PushConsumer { // Instance access uses public and private keys, which can be obtained from the instance token management page private static final String ACCESS_KEY = "xxx"; private static final String SECRET_KEY = "xxx"; static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY)); } public static void main(String[] args) throws InterruptedException, MQClientException { // "GROUP_NAME" is the consumer group, you can get it on the instance group management page, the last argument is set to "" DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_NAME", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, ""); // Instance access address, can get from the instance list page consumer.setNamesrvAddr(""); consumer.subscribe("Topic_Name", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }