Docs
rocketmq
Access Guide
Receive and Send Timed Delay Messages

Scheduled/Delayed Messages

Scheduled/delayed messages mean that after the message is sent to the server, it will not be consumed immediately, but will be delivered to the real topic after a certain period of time.

URocketMQ offers two types of scheduling/delaying:

  • Open Source Fixed Gradient Delay: The open source RocketMQ supports delayed messages with fixed gradients. After the message is sent, it will not be consumed immediately but will be delivered to the real topic after a certain period of time. The default configuration has 18 levels: “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”, each level corresponds to a different delay time, for example 1 corresponds to 1s, 5 corresponds to 1m.
  • Custom Scheduled/Delay: URocketMQ supports any second-level delay messages, users can custom schedule or delay to send messages any second in the future, for usage limitations please see Message Types.

Scheduled/delayed messages only need to be handled on the production side, no special handling is required on the consumption side.

Producing Scheduled/Delayed messages

Open Source Fixed Gradient Delay

You can set different delay levels according to actual needs, as follows:

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;

public class Producer {
    // Access using public and private keys, obtainable from the instance token management page
    private static final String ACCESS_KEY = "xxx";
    private static final String SECRET_KEY = "xxx";

    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
    }
    public static void main(String[] args) throws MQClientException, InterruptedException {

        // "ProducerGroupName" is the production group, users can use the production Group created in the console or customize one
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
        // Instance access address, obtainable from the instance list page
        producer.setNamesrvAddr("1.1.1.1:9876");
        producer.start();

        for (int i = 0; i < 128; i++)
            try {
                {
                    Message msg = new Message("Topic_Name",
                        "Message_Tag",
                        "Message_Key",
                        "Message Content Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    // Can set different delay levels as needed
                    msg.setDelayTimeLevel(3);
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }
}

Custom Scheduled/Delay

Custom scheduling and delay codes are slightly different, as follows:

Custom Scheduled Messages

import java.text.SimpleDateFormat;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;

public class Producer {
    // Access using public and private keys, obtainable from the instance token management page
    private static final String ACCESS_KEY = "xxx";
    private static final String SECRET_KEY = "xxx";

    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
    }
    public static void main(String[] args) throws MQClientException, InterruptedException {

        // "ProducerGroupName" is the production group, users can use the production Group created in the console or customize one
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
        // Instance access address, obtainable from the instance list page
        producer.setNamesrvAddr("1.1.1.1:9876");
        producer.start();

        for (int i = 0; i < 128; i++)
            try {
                {
                    // Supports second-level unit delay, startDeliverTime unit is milliseconds
                    long startDeliverTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-11-01 11:49:00").getTime();
                    Message msg = new Message("Topic_Name",
                        "Message_Tag",
                        "Message_Key",
                        "Message Content Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(startDeliverTime));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }
}

Custom Delay Messages

import java.text.SimpleDateFormat;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;

public class Producer {
    // Access using public and private keys, obtainable from the instance token management page
    private static final String ACCESS_KEY = "xxx";
    private static final String SECRET_KEY = "xxx";

    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
    }
    public static void main(String[] args) throws MQClientException, InterruptedException {

        // "ProducerGroupName" is the production group, users can use the production Group created in the console or customize one
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
        // Instance access address, obtainable from the instance list page
        producer.setNamesrvAddr("1.1.1.1:9876");
        producer.start();

        for (int i = 0; i < 128; i++)
            try {
                {
                    Message msg = new Message("Topic_Name",
                        "Message_Tag",
                        "Message_Key",
                        "Message Content Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    long currMillseconds = System.currentTimeMillis();
                    // Supports second-level unit delay, startDeliverTime unit is milliseconds
                    long startDeliverTime = currMillseconds + i * 1000;
                    msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(startDeliverTime));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }
}

Subscribing to Scheduled/Delayed messages

Subscribing to Scheduled/Delayed messages is consistent with subscribing to normal messages, for details please refer to Subscribing to Normal Messages.