收發順序消息
更新時間 2025-04-22 14:04:25
最近更新時間: 2025-04-22 14:04:25
分享文章
順序消息是分布式消息服務RocketMQ版提供的一種嚴格按照順序來發布和消費的消息類型。
順序消息分為全局順序消息和分區順序消息:
- 全局順序消息:對于指定的一個Topic,將隊列數量設置為1,這個隊列內所有消息按照嚴格的先入先出FIFO(First In First Out)的順序進行發布和訂閱。
- 分區順序消息:對于指定的一個Topic,同一個隊列內的消息按照嚴格的FIFO順序進行發布和訂閱。生產者指定分區選擇算法,保證需要按順序消費的消息被分配到同一個隊列。
全局順序消息和分區順序消息的區別僅為隊列數量不同,代碼沒有區別。
收發順序消息前,請參考收集連接信息收集RocketMQ所需的連接信息。
準備環境
-
執行以下命令,檢查是否已安裝Go。
go?version返回如下回顯時,說明Go已經安裝。
go?version?go1.21.5?linux/amd64如果未安裝Go,請官網下載并安裝。
-
在“go.mod”中增加以下代碼,添加依賴。
module?rocketmq-example-go go?1.13 require?( github.com/apache/rocketmq-client-go/v2?v2.1.2 )
以下示例代碼中的參數說明如下,請參考收集連接信息獲取參數值。
- GROUP:表示消費組名稱。
- ENDPOINT:表示實例連接地址和端口。
- TOPIC:表示Topic名稱。
發送順序消息
參考如下示例代碼。
package?main
import?(
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"os"
"strconv"
)
func?main()?{
//?填寫分布式消息服務RocketMQ控制臺Namesrv接入點
endpoint?:=?"${ENDPOINT}"
//?填寫AccessKey,在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
accessKey?:=?"${ACCESS_KEY}"
//?填寫SecretKey?在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶密鑰
secretKey?:=?"${SECRET_KEY}"
//?填寫Topic,在管理控制臺創建
topic?:=?"${TOPIC}"
p,?_?:=?rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{endpoint})),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey:?accessKey,
SecretKey:?secretKey,
}),
//?配置使用HASH隊列選擇器.
producer.WithQueueSelector(producer.NewHashQueueSelector()),
)
err?:=?p.Start()
if?err?!=?nil?{
fmt.Printf("start?producer?error:?%s",?err.Error())
os.Exit(1)
}
for?i?:=?0;?i?<?4;?i++?{
msg?:=?&primitive.Message{
Topic:?topic,
Body:??[]byte("Hello?RocketMQ?"?+?strconv.Itoa(i)),
}
//?msg?指定分區key,?自定義即可
msg.WithShardingKey("key"?+?strconv.Itoa(i))
res,?err?:=?p.SendSync(context.Background(),?msg)
if?err?!=?nil?{
fmt.Printf("send?message?error:?%s\n",?err)
}?else?{
fmt.Printf("send?message?success:?result=%s\n",?res.String())
}
}
err?=?p.Shutdown()
if?err?!=?nil?{
fmt.Printf("shutdown?producer?error:?%s",?err.Error())
}
}
訂閱順序消息
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"os"
)
func main() {
// 填寫分布式消息服務RocketMQ控制臺Namesrv接入點
endpoint := "${ENDPOINT}"
// 填寫AccessKey,在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
accessKey := "${ACCESS_KEY}"
// 填寫SecretKey 在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶密鑰
secretKey := "${SECRET_KEY}"
// 填寫Topic,在管理控制臺創建
topic := "${TOPIC}"
// 在控制臺創建的訂閱組(Group)
group := "${GROUP}"
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName(group),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{endpoint})),
consumer.WithCredentials(primitive.Credentials{
AccessKey: accessKey,
SecretKey: secretKey,
}),
// 使用順序消費的模式
consumer.WithConsumerOrder(true),
)
err := c.Subscribe(topic, consumer.MessageSelector{}, func(ctx context.Context,
messages ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
orderlyCtx, _ := primitive.GetOrderlyCtx(ctx)
fmt.Printf("orderlyCtx: %#v\n", orderlyCtx)
for i := range messages {
// 處理消息
fmt.Printf("receive msg: %v \n", messages[i])
}
// 如果消息處理成功則返回consumer.ConsumeSuccess
// 如果消息處理失敗則返回consumer.ConsumeRetryLater
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
waitChan := make(chan interface{}, 0)
<-waitChan
}