Docs
rocketmq
Access Guide
Message Trace

Message Tracing

Business operations using message tracing require enabling message tracing on both the production side and the consumption side.

Enable on Production Side

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;

public class Producer {
    // Instance access uses public and private keys, which can be obtained from the instance token management page
    private static final String ACCESS_KEY = "xxx";
    private static final String SECRET_KEY = "xxx";

    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
    }
    public static void main(String[] args) throws MQClientException, InterruptedException {

        // "ProducerGroupName" is the producer group, users can use the producer group created in the console or customize it, the final argument is set to ""
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook(), true, "");
        // Instance access address, can obtain from instance list page
        producer.setNamesrvAddr("1.1.1.1:9876");
        producer.start();

        for(int i = 0; i < 128; i++)
            try {
                {
                    Message msg = new Message("Topic_Name",
                        "Message_Tag",
                        "Message_Key",
                        "Message Content Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }
}

Enable on Consumption Side

import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;

public class PushConsumer {
    // Instance access uses public and private keys, which can be obtained from the instance token management page
    private static final String ACCESS_KEY = "xxx";
    private static final String SECRET_KEY = "xxx";

    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
    }

    public static void main(String[] args) throws InterruptedException, MQClientException {
        // "GROUP_NAME" is the consumer group, you can get it on the instance group management page, the last argument is set to ""
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_NAME", getAclRPCHook(),
            new AllocateMessageQueueAveragely(), true, "");
        // Instance access address, can get from the instance list page
        consumer.setNamesrvAddr("1.1.1.1:8100");
        consumer.subscribe("Topic_Name", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}