Docs
rocketmq
Access Guide
Receive and Send Transaction Messages

Sending and Receiving Transaction Messages

The local transactions of the application and the send message operations can be defined into the global transaction, either success or failure at the same time, the ultimate consistency of distributed transactions can be achieved through transaction messages.

Sending Transaction Messages

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;

public class TransactionProducer {
    // Instance access uses public and private keys, which can be obtained on the instance token management page
    private static final String ACCESS_KEY = "xxx";
    private static final String SECRET_KEY = "xxx";

    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
    }
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                System.out.println("Execute Local Transaction");
                return LocalTransactionState.COMMIT_MESSAGE;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("Check Transaction Status");
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        };

        // "ProducerGroupName" is the production group, users can use the production Group created by the console or customize
        TransactionMQProducer producer = new TransactionMQProducer("ProducerGroupName",  getAclRPCHook());
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        // Instance access address, which can be obtained on the instance list page
        producer.setNamesrvAddr("1.1.1.1:9876");
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("Topic_Name", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

Where executeLocalTransaction and checkLocalTransaction are two callbacks that producers need to implement, descriptions are as follows:

  • executeLocalTransaction: Execute local transaction logic.
  • checkLocalTransaction: If the server has not received a commit or roll back request for a long time, it will initiate a checkback to the client, and the client will accept the checkback request to check the status of the local transaction and resend the transaction commit or rollback request.

Both callback functions may return the following three transaction states:

  • LocalTransactionState.COMMIT_MESSAGE: Commit the transaction, allow the subscription side to consume this message;
  • LocalTransactionState.ROLLBACK_MESSAGE: Roll back the transaction, the message will be discarded and not allowed to be consumed;
  • LocalTransactionState.UNKNOW: Unknown state, the server will ask the producer to check the status of this message again.

Subscribe Transaction Messages

The subscription of transaction messages is consistent with ordinary messages, which can be referred to Subscribe Ordinary Messages.