如果消息重復消費會影響您的業務處理,要對消息做冪等處理。本文介紹消息冪等的概念、適用場景以及處理方法。
概念
在消息領域,冪等是指Consumer重復消費某條消息時,重復消費的結果與消費一次的結果是相同的,并且多次消費并未對業務系統產生任何負面影響。
例如,在支付場景下,Consumer消費扣款消息,對一筆訂單執行扣款操作,扣款金額為500元。如果因網絡不穩定等原因導致扣款消息重復投遞,Consumer重復消費了該扣款消息,但最終的業務結果是只扣款一次,扣費500元,且用戶的扣款記錄中對應的訂單只有一條扣款流水,不會多次扣除費用。那么這次扣款操作是符合要求的,整個消費過程實現了消息冪等。
適用場景
在互聯網應用中,尤其在網絡不穩定的情況下,分布式消息服務RabbitMQ的消息有可能會出現重復。如果消息重復消費會影響您的業務處理,請對消息做冪等處理。消息重復的可能原因如下:
- 發送時消息重復
當一條消息已被成功發送到服務端并完成持久化,此時出現了網絡閃斷或者客戶端宕機,導致服務端對客戶端應答失敗。 如果此時Producer意識到消息發送失敗并嘗試再次發送消息,Consumer后續會收到兩條內容相同并且Message ID也相同的消息。
- 投遞時消息重復
消息消費的場景下,消息已投遞到Consumer并完成業務處理,當客戶端給服務端反饋應答的時候網絡閃斷。為了保證消息至少被消費一次,分布式消息服務RabbitMQ的服務端將在網絡恢復后再次嘗試投遞之前已被處理過的消息,Consumer后續會收到兩條內容相同并且Message ID也相同的消息。
- 負載均衡時消息重復(包括但不限于網絡抖動、服務端重啟以及Consumer應用重啟)
當分布式消息服務RabbitMQ的服務端或客戶端重啟、擴容或縮容時,會觸發Rebalance,此時Consumer可能會收到重復消息。
處理方法
以Message ID為冪等鍵對消息進行冪等處理的步驟如下:
(1)在數據庫中創建一張unique key索引為唯一Message ID的表。
(2)在Producer客戶端為每條消息設置唯一Message ID。
設置唯一Message ID的示例代碼如下:
AMQP.BasicProperties?props?=newAMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
channel.basicPublish("ExchangeName","RoutingKey",true,?props,("消息發送"+?i).getBytes());
(3)在Consumer客戶端根據唯一Message ID對消息進行冪等處理。
根據唯一Message ID進行冪等處理的示例代碼如下:
channel.basicConsume(Producer.QueueName, false, "MyConsumerTag",
new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope env,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 1. 獲取業務唯一性索引數據。
try{
String messageId = properties.getMessageId();
// Message ID或者其他作為unique key的信息。
// 2. 開啟數據庫事務。
idempTable.insert(messageId);
// 3. 對接收到的消息,進行業務邏輯處理。
// 4. 提交或回滾事務。// 處理成功,則進行ACK,否則不要進行ACK。
channel.basicAck(env.getDeliveryTag(), false);
}
catch (數據庫主鍵沖突異常 e){
// 重復消息,直接確認掉。
channel.basicAck(env.getDeliveryTag(), false);
}
}
}
);