前提條件
創建分布式消息服務RabbitMQ相應環境。
操作步驟
RabbitMQ是一個開源的消息隊列中間件,支持生產者和消費者之間的異步通信。在上述資源準備完成后,接下來需要編譯工程生產消費,主要分以下幾個步驟:
1、編寫生產者代碼:使用編程語言編寫一個生產者程序。該程序將連接到RabbitMQ服務器,并將消息發送到隊列中。
2、編寫消費者代碼:同樣使用編程語言編寫一個消費者程序。該程序將連接到RabbitMQ服務器,并從隊列中接收消息。
3、運行生產者和消費者:運行生產者程序,它將發送消息到隊列中。然后運行消費者程序,它將從隊列中接收并處理消息。
4、驗證結果:檢查生產者和消費者程序的輸出,確保消息被正確發送和接收。
引入依賴
在使用RabbitMQ時,你需要在你的項目中引入相應的依賴。具體的依賴項可能會因你的項目和需求而有所不同。在使用RabbitMQ之前,請確保查閱官方文檔以獲取最新的依賴項和使用說明。
以Java編程語言為例,可以使用RabbitMQ的Java客戶端庫。你可以在Maven或Gradle構建工具中添加以下依賴項:
<dependency>??
????<groupId>com.rabbitmq</groupId>??
????<artifactId>amqp-client</artifactId>??
????<version>5.7.0</version>??
</dependency>??
可以通過下載JAR包來引入依賴。
綁定BindingKey
在RabbitMQ中,綁定鍵(Binding Key)是用于綁定交換機(Exchange)和隊列(Queue)的關鍵字。當一個消息被發送到交換機時,交換機會根據綁定鍵將消息路由到相應的隊列中。
綁定鍵是在創建綁定(Binding)時指定的,它定義了消息應該如何被路由到隊列。綁定鍵通常與消息的屬性或內容進行匹配,以確定消息應該發送到哪個隊列。
綁定鍵可以具有不同的形式,取決于使用的交換機類型。以下是一些常見的綁定鍵形式:
-
接匹配(Direct Match):綁定鍵與消息的路由鍵(Routing Key)完全匹配時,消息會被路由到相應的隊列。
-
通配符匹配(Wildcard Match):綁定鍵可以使用通配符進行模式匹配。常見的通配符有和#,其中表示匹配一個單詞,#表示匹配零個或多個單詞。
-
主題匹配(Topic Match):綁定鍵可以使用主題模式進行匹配。主題模式使用.分隔的單詞,可以包含*和#通配符。例如,stock.#可以匹配stock.price、stock.quantity等。
綁定鍵的選擇取決于你的需求和消息的路由策略。通過正確設置綁定鍵,你可以確保消息被正確地路由到相應的隊列中,以便消費者進行處理。
代碼示例:
import?com.rabbitmq.client.BuiltinExchangeType;??
import?com.rabbitmq.client.Channel;??
import?com.rabbitmq.client.Connection;??
import?com.rabbitmq.client.ConnectionFactory;??
import?java.io.IOException;??
import?java.util.concurrent.TimeoutException;??
public?class?RabbitmqBindingKey?{??
?? private?final?static?String?EXCHANGE_NAME?=?"exchangeTest";??
????private?final?static?String?QUEUE_NAME?=?"helloMQ";??
????private?final?static?String?ROUTING_KEY?=?"test";??
????public?static?void?main(String[]?args)?throws?IOException,? TimeoutException?{??
????????//?創建連接工廠??
????????ConnectionFactory?factory?=?new?ConnectionFactory();??
?????????//?設置主機ip??
????????factory.setHost("192.168.3.113");??
????????//?設置amqp的端口號??
????????factory.setPort(5672);??
????????//?設置用戶名密碼??
????????factory.setUsername("rabbitmq");??
????????factory.setPassword("r@bb!tMQ#3333323");??
??????????//?設置Vhost,需要在控制臺先創建??
????????factory.setVirtualHost("vhost");??
?????????//基于網絡環境合理設置超時時間??
????????factory.setConnectionTimeout(30?*?1000);??
????????factory.setHandshakeTimeout(30?*?1000);??
????????factory.setShutdownTimeout(0);??
??????????Connection?connection?=?factory.newConnection();??
????????Channel?channel?=?connection.createChannel();??
????????channel.exchangeDeclare(EXCHANGE_NAME,? BuiltinExchangeType.DIRECT,?true);??
??????????//?創建?${QueueName}。Queue?可以在控制臺創建,也可以用API創建??
????????channel.queueDeclare(QUEUE_NAME,?true,?false,?false,?null);??
??????????//?Queue?與?Exchange進行綁定,注冊?BindingKeyTest??
????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?ROUTING_KEY);??
??????????connection.close();??
??????}??
}??
完成后,可以在實例列表的交換器選項卡和隊列選項卡查看結果。
生產消息
生產者需要創建一個連接到RabbitMQ服務器,然后創建一個通道(Channel)來進行消息的發布。在發布消息之前,生產者通常需要先聲明一個隊列,以確保消息能夠被正確地路由和接收。
一旦連接和通道建立完成,生產者可以使用basicPublish()方法將消息發布到指定的隊列。在發布消息時,需要指定目標隊列的名稱、消息內容以及其他的屬性。
發布消息后,RabbitMQ將會將消息存儲在隊列中,等待消費者來接收。消費者可以使用相同的客戶端庫來創建連接和通道,并使用basicConsume()方法來訂閱隊列并接收消息。一旦有消息到達隊列,消費者就會收到消息并進行相應的處理。
通過使用RabbitMQ,生產者和消費者可以實現解耦,即它們可以獨立地進行開發和部署。生產者可以按照自己的節奏和需求發布消息,而消費者可以根據自己的處理能力和負載來接收和處理消息。
代碼示例:
import?com.rabbitmq.client.Channel;??
import?com.rabbitmq.client.Connection;??
import?com.rabbitmq.client.ConnectionFactory;??
import?java.io.IOException;??
import?java.nio.charset.StandardCharsets;??
import?java.util.concurrent.TimeUnit;??
import?java.util.concurrent.TimeoutException;??
public?class?RabbitmqProducer?{??
????//?private?final?static?String?EXCHANGE_NAME?=?"exchangeTest";??
????private?final?static?String?QUEUE_NAME?=?"helloMQ";??
????//?private?final?static?String?ROUTING_KEY?=?"test";??
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException,?InterruptedException?{??
????????//?創建連接工廠??
????????ConnectionFactory?factory?=?new?ConnectionFactory();??
??????????//?設置主機ip??
????????factory.setHost("192.168.3.113");??
????????//?設置amqp的端口號??
????????factory.setPort(5672);??
????????//?設置用戶名密碼??
????????factory.setUsername("username");??
????????factory.setPassword("password");??
??????????//?設置Vhost,需要在控制臺先創建??
????????factory.setVirtualHost("test");??
??????????//基于網絡環境合理設置超時時間??
????????factory.setConnectionTimeout(30?*?1000);??
????????factory.setHandshakeTimeout(30?*?1000);??
????????factory.setShutdownTimeout(0);??
??????????//?創建一個連接??
????????Connection?connection?=?factory.newConnection();??
??????????//?創建一個頻道??
????????Channel?channel?=?connection.createChannel();??
?????????//?發送方消息確認,channel.confirmSelect();??
????????//?啟用發送方事務機制,channel.txSelect();???
????????//?指定一個隊列??
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);??
????????for?(int?i?=?0;?i?<?100;?i++)?{??
??????????//?發送的消息??
??????????String?message?=?"Hello?rabbitMQ!_"?+?i;??
??????????//?往隊列中發送一條消息,使用默認的交換器??
??????????channel.basicPublish("",?QUEUE_NAME,?null,?message.getBytes(StandardCharsets.UTF_8));??
??????????//?使用自定義交換器,需要在管理臺預先建好,并設置routing?key??
??????????//?channel.basicPublish(EXCHANGE_NAME,?ROUTING_KEY,?null,?message.getBytes(StandardCharsets.UTF_8));??
??????????System.out.println("?[x]?Sent?'"?+?message?+?"'");??
??????????TimeUnit.MILLISECONDS.sleep(100);??
????????}??
??????????//關閉頻道和連接??
????????channel.close();??
????????connection.close();??
??????}??
}??
消息發送后,可以進入控制臺,在實例列表的隊列選項卡查看消息發送狀消息
消費消息
消費者需要創建一個連接到RabbitMQ服務器,然后創建一個通道(Channel)來進行消息的訂閱。在訂閱消息之前,消費者通常需要先聲明一個隊列,以確保能夠正確地接收和處理消息。
一旦連接和通道建立完成,消費者可以使用basicConsume()方法來訂閱指定的隊列,并注冊一個回調函數來處理接收到的消息。當有消息到達隊列時,RabbitMQ會將消息推送給消費者,消費者的回調函數將被調用,從而可以對消息進行處理。
消費者可以根據自己的需求設置消息的確認機制。在默認情況下,消費者在接收到消息后,會自動向RabbitMQ發送一個確認(ack)消息,表示已成功接收并處理該消息。如果消費者在處理消息時發生錯誤,可以選擇不發送確認消息,從而使消息重新進入隊列,以便其他消費者重新處理。
通過使用RabbitMQ,消費者可以實現解耦,即它們可以獨立地進行開發和部署。消費者可以根據自己的處理能力和負載來接收和處理消息,從而實現負載均衡和水平擴展。
代碼示例:
import?com.rabbitmq.client.*;??
import?java.io.IOException;??
import?java.nio.charset.StandardCharsets;??
import?java.util.concurrent.TimeoutException;??
public?class?RabbitmqConsumer?{??
????//隊列名稱??
????private?final?static?String?QUEUE_NAME?=?"helloMQ";??
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{??
?????//創建連接工廠??
???? ConnectionFactory?factory?=?new?ConnectionFactory();??
?????//設置主機ip??
?????factory.setHost("192.168.3.113");??
?????//設置amqp的端口號??
?????factory.setPort(5672);??
?????//設置用戶名密碼??
?????factory.setUsername("username");??
?????factory.setPassword("password");??
?????//設置Vhost,需要在控制臺先創建??
?????factory.setVirtualHost("test");??
?????//基于網絡環境合理設置超時時間??
?????factory.setConnectionTimeout(30?*?1000);??
?????factory.setHandshakeTimeout(30?*?1000);??
?????factory.setShutdownTimeout(0);??
?????Connection?connection?=?factory.newConnection();??
?????Channel?channel?=?connection.createChannel();??
???? //聲明隊列,主要為了防止消息接收者先運行此程序,隊列還不存在時創建隊列。??
?????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);??
?????System.out.println("?[*]?Waiting?for?messages.?To?exit?press?CTRL+C");??
?????Consumer?consumer?=?new?DefaultConsumer(channel)?{??
??????@Override??
??????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{??
????????String?message?=?new?String(body,?StandardCharsets.UTF_8);??
????????System.out.println("?[x]?Received?'"?+?message?+?"'");??
?????????}??
?????};??
?????channel.basicConsume(QUEUE_NAME,?true,?consumer);??
???}??
}??
完成上述步驟后,可以在控制臺查看消費者是否啟動成功。
完成以上所有步驟后,就成功接入了RabbitMQ服務,可以用消息隊列進行消息發送和訂閱了。