Scheduled/Delayed Messages
Scheduled/delayed messages mean that after the message is sent to the server, it will not be consumed immediately, but will be delivered to the real topic after a certain period of time.
URocketMQ offers two types of scheduling/delaying:
- Open Source Fixed Gradient Delay: The open source RocketMQ supports delayed messages with fixed gradients. After the message is sent, it will not be consumed immediately but will be delivered to the real topic after a certain period of time. The default configuration has 18 levels: “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”, each level corresponds to a different delay time, for example 1 corresponds to 1s, 5 corresponds to 1m.
- Custom Scheduled/Delay: URocketMQ supports any second-level delay messages, users can custom schedule or delay to send messages any second in the future, for usage limitations please see Message Types.
Scheduled/delayed messages only need to be handled on the production side, no special handling is required on the consumption side.
Producing Scheduled/Delayed messages
Open Source Fixed Gradient Delay
You can set different delay levels according to actual needs, as follows:
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;
public class Producer {
// Access using public and private keys, obtainable from the instance token management page
private static final String ACCESS_KEY = "xxx";
private static final String SECRET_KEY = "xxx";
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
}
public static void main(String[] args) throws MQClientException, InterruptedException {
// "ProducerGroupName" is the production group, users can use the production Group created in the console or customize one
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
// Instance access address, obtainable from the instance list page
producer.setNamesrvAddr("1.1.1.1:9876");
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("Topic_Name",
"Message_Tag",
"Message_Key",
"Message Content Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
// Can set different delay levels as needed
msg.setDelayTimeLevel(3);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
Custom Scheduled/Delay
Custom scheduling and delay codes are slightly different, as follows:
Custom Scheduled Messages
import java.text.SimpleDateFormat;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;
public class Producer {
// Access using public and private keys, obtainable from the instance token management page
private static final String ACCESS_KEY = "xxx";
private static final String SECRET_KEY = "xxx";
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
}
public static void main(String[] args) throws MQClientException, InterruptedException {
// "ProducerGroupName" is the production group, users can use the production Group created in the console or customize one
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
// Instance access address, obtainable from the instance list page
producer.setNamesrvAddr("1.1.1.1:9876");
producer.start();
for (int i = 0; i < 128; i++)
try {
{
// Supports second-level unit delay, startDeliverTime unit is milliseconds
long startDeliverTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-11-01 11:49:00").getTime();
Message msg = new Message("Topic_Name",
"Message_Tag",
"Message_Key",
"Message Content Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(startDeliverTime));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
Custom Delay Messages
import java.text.SimpleDateFormat;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;
public class Producer {
// Access using public and private keys, obtainable from the instance token management page
private static final String ACCESS_KEY = "xxx";
private static final String SECRET_KEY = "xxx";
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
}
public static void main(String[] args) throws MQClientException, InterruptedException {
// "ProducerGroupName" is the production group, users can use the production Group created in the console or customize one
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
// Instance access address, obtainable from the instance list page
producer.setNamesrvAddr("1.1.1.1:9876");
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("Topic_Name",
"Message_Tag",
"Message_Key",
"Message Content Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
long currMillseconds = System.currentTimeMillis();
// Supports second-level unit delay, startDeliverTime unit is milliseconds
long startDeliverTime = currMillseconds + i * 1000;
msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(startDeliverTime));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
Subscribing to Scheduled/Delayed messages
Subscribing to Scheduled/Delayed messages is consistent with subscribing to normal messages, for details please refer to Subscribing to Normal Messages.