收發事務消息
更新時間 2025-04-22 14:04:26
最近更新時間: 2025-04-22 14:04:26
分享文章
分布式消息服務RocketMQ版的事務消息支持在業務邏輯與發送消息之間提供事務保證,通過兩階段的方式提供對事務消息的支持,事務消息交互流程如圖1所示。
圖1 事務消息交互流程
事務消息生產者首先發送半消息,然后執行本地事務。如果執行成功,則發送事務提交,否則發送事務回滾。服務端在一段時間后如果一直收不到提交或回滾,則發起回查,生產者在收到回查后重新發送事務提交或回滾。消息只有在提交之后才投遞給消費者,消費者對回滾的消息不可見。
收發事務消息前,請參考收集連接信息收集RocketMQ所需的連接信息。
準備環境
-
在命令行輸入python,檢查是否已安裝Python。得到如下回顯,說明Python已安裝。
PS?C:\>?python Python?3.9.9?(tags/v3.9.9:ccb0e6a,?Nov?15?2021,?18:08:50)?[MSC?v.1929?64?bit?(AMD64)]?on?win32 Type?"help",?"copyright",?"credits"?or?"license"?for?more?information.如果未安裝Python,請使用以下命令安裝:
pip?install?rocketmq-client-python -
安裝librocketmq庫和rocketmq-client-python。
說明建議下載rocketmq-client-cpp-2.2.0,獲取librocketmq庫。
-
將librocketmq.so添加到系統動態庫搜索路徑。
- 查找librocketmq.so的路徑。
find?/?-name?librocketmq.so - 將librocketmq.so添加到系統動態庫搜索路徑。
ln?-s?/查找到的librocketmq.so路徑/librocketmq.so?/usr/lib sudo?ldconfig
- 查找librocketmq.so的路徑。
以下示例代碼中的參數說明如下,請參考收集連接信息獲取參數值。
- GROUP:表示消費組名稱。
- ENDPOINT:表示實例連接地址和端口。
- TOPIC:表示Topic名稱。
發送消息
參考如下示例代碼。
import?time
from?rocketmq.client?import?TransactionMQProducer,?Message,?TransactionStatus
endpoint?=?"${ENDPOINT}"??#?填寫分布式消息服務RocketMQ控制臺Namesrv接入點
access_key?=?"${ACCESS_KEY}"??#?填寫AccessKey 在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
access_secret?=?"${SECRET_KEY}"??#?填寫SecretKey 在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶密鑰
topic?=?"${TOPIC}"??#?填寫Topic,在管理控制臺創建
producer_group?=?"${GROUP}"??#?生產者組group
def?transaction_checker_callback(msg,?user_args):
????return?TransactionStatus.COMMIT
def?transaction_local_execute(msg,?user_args):
????return?TransactionStatus.UNKNOWN
producer?=?TransactionMQProducer(producer_group,?transaction_checker_callback)
producer.set_name_server_address(endpoint)
producer.set_session_credentials(access_key,?access_secret,?"")
producer.start()
msg?=?Message(topic)
msg.set_body("Hello?RocketMQ")
ret?=?producer.send_message_in_transaction(msg,?transaction_local_execute,?None)
print(ret.status,?ret.msg_id,?ret.offset)
while?True:
????time.sleep(3600)
訂閱消息
參考如下示例代碼。
import?time
from?rocketmq.client?import?PushConsumer,?ConsumeStatus
endpoint?=?"${ENDPOINT}"??#?填寫分布式消息服務RocketMQ控制臺Namesrv接入點
access_key?=?"${ACCESS_KEY}"??#?填寫AccessKey 在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶ID
access_secret?=?"${SECRET_KEY}"??#?填寫SecretKey 在分布式消息服務RocketMQ控制臺用戶管理菜單中創建的用戶密鑰
topic?=?"${TOPIC}"??#?填寫Topic,在管理控制臺創建
group?=?"${GROUP}"??#?填寫訂閱組group,在管理控制臺創建
def?callback(msg):
????print(msg.id,?msg.body)
????return?ConsumeStatus.CONSUME_SUCCESS
consumer?=?PushConsumer(group)
consumer.set_name_server_address(endpoint)
consumer.set_session_credentials(access_key,?access_secret,?"")
consumer.subscribe(topic,?callback)
consumer.start()
while?True:
????time.sleep(3600)
consumer.shutdown()