Message Tracing
Business operations using message tracing require enabling message tracing on both the production side and the consumption side.
Enable on Production Side
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;
public class Producer {
// Instance access uses public and private keys, which can be obtained from the instance token management page
private static final String ACCESS_KEY = "xxx";
private static final String SECRET_KEY = "xxx";
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
}
public static void main(String[] args) throws MQClientException, InterruptedException {
// "ProducerGroupName" is the producer group, users can use the producer group created in the console or customize it, the final argument is set to ""
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook(), true, "");
// Instance access address, can obtain from instance list page
producer.setNamesrvAddr("1.1.1.1:9876");
producer.start();
for(int i = 0; i < 128; i++)
try {
{
Message msg = new Message("Topic_Name",
"Message_Tag",
"Message_Key",
"Message Content Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
Enable on Consumption Side
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;
public class PushConsumer {
// Instance access uses public and private keys, which can be obtained from the instance token management page
private static final String ACCESS_KEY = "xxx";
private static final String SECRET_KEY = "xxx";
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
}
public static void main(String[] args) throws InterruptedException, MQClientException {
// "GROUP_NAME" is the consumer group, you can get it on the instance group management page, the last argument is set to ""
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_NAME", getAclRPCHook(),
new AllocateMessageQueueAveragely(), true, "");
// Instance access address, can get from the instance list page
consumer.setNamesrvAddr("1.1.1.1:8100");
consumer.subscribe("Topic_Name", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}