收發定時/延時消息
更新時間 2025-04-22 14:04:25
最近更新時間: 2025-04-22 14:04:25
分享文章
分布式消息服務RocketMQ版支持任意時間的定時消息,最大推遲時間可達到40天。
定時消息即生產者生產消息到分布式消息服務RocketMQ版后,消息不會立即被消費,而是延遲到設定的時間點后才會發送給消費者進行消費。
發送定時消息前,請參考收集連接信息收集RocketMQ所需的連接信息。
適用場景
定時消息適用于以下場景:
- 消息對應的業務邏輯有時間窗口要求,如電商交易中超時未支付關閉訂單的場景。在訂單創建時發送一條定時消息,5分鐘以后投遞給消費者,消費者收到此消息后需要判斷對應訂單是否完成支付,如果未完成支付,則關閉訂單。如果已完成,則忽略。
- 通過消息觸發定時任務的場景,如在某些固定時間點向用戶發送提醒消息。
注意
定時消息的最大延遲時間為40天,延遲超過40天的消息將會發送失敗。
定時消息的定時時間如果被設置成當前時間戳之前的某個時刻,消息將立刻投遞給消費者。
定時消息的精度有1s~2s的延遲誤差
無法確保定時消息僅投遞一次,定時消息可能會重復投遞。
定時消息的定時時間是服務端開始向消費端投遞的時間。如果消費者當前有消息堆積,那么定時消息會排在堆積消息后面,將不能嚴格按照配置的時間進行投遞。
由于客戶端和服務端可能存在時間差,消息的實際投遞時間與客戶端設置的投遞時間之間可能存在偏差,以服務端時間為準。
設置定時消息的投遞時間后,依然受消息老化時間限制,默認消息過期時間為7天。例如,設置定時消息5天后才能被消費,如果第5天后一直沒被消費,那么這條消息將在第12天被刪除。
定時消息將占用普通消息約3倍的存儲空間,大量使用定時消息時需要注意存儲空間占用。
準備環境
開源的Java客戶端支持連接分布式消息服務RocketMQ版,推薦使用的客戶端版本為4.9.7。
通過以下任意一種方式引入依賴:
-
使用Maven方式引入依賴。
<dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.7</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-acl</artifactId> <version>4.9.7</version> </dependency> </dependencies> -
點擊下載依賴JAR包:rocketmq-all-4.9.7-bin-release.zip
發送定時/延時消息
發送定時/延時消息的示例代碼如下:
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ProducerTimerDelayExample {
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
"accessKey", // 分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
"accessSecret" // 分布式消息服務RocketMQ控制臺用戶管理菜單中創建的密鑰
));
}
public static void main(String[] args) throws Exception {
/*
* 創建Producer,如果想開啟消息軌跡,可以按照如下方式創建:
* DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
*/
DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
// 填入控制臺NAMESRV接入點地址
producer.setNamesrvAddr("XXX:xxx");
//producer.setUseTLS(true); // 如果需要開啟SSL,請增加此行代碼
producer.start();
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message("TopicTest",
"YOUR MESSAGE TAG",
"Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
/*
* 發送延時消息,需要設置延時時間,單位毫秒(ms),消息將在指定延時時間后投遞,例如消息將在3秒后投遞。
*/
long delayTime = System.currentTimeMillis() + 3000;
msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));
/*
* 若需要發送定時消息,則需要設置定時時間,消息將在指定時間進行投遞,例如消息將在2022-10-10 10:10:00投遞。
* 定時時間格式為:yyyy-MM-dd HH:mm:ss,若設置的時間戳在當前時間之前,則消息將被立即投遞給Consumer。
* long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2022-10-10 10:10:00").getTime();
* msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(timeStamp));
*/
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
消費定時/延時消息
消費定時/延時消息的示例代碼如下
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.remoting.RPCHook;
public class ConsumerDelayExample {
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
"accessKey", // 分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
"accessSecret" // 分布式消息服務RocketMQ控制臺用戶管理菜單中創建的密鑰
));
}
public static void main(String[] args) throws Exception {
/*
* 創建Consumer,如果想開啟消息軌跡,可以按照如下方式創建:
* DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new
AllocateMessageQueueAveragely(), true, null);
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new
AllocateMessageQueueAveragely());
// 填入控制臺NAMESRV接入點地址
consumer.setNamesrvAddr("XXX:xxx");
// consumer.setUseTLS(true); // 如果需要開啟SSL,請增加此行代碼
/*
* 如果想要消費指定TAG的消息,可以按照如下方式訂閱:* 為訂閱所有的TAG
* pushConsumer.subscribe(TOPIC_NAME, "Tag1");
*/
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("Receive New Messages: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("Consumer Started.");
}
}