目前提供Kafka專享版實例的服務,Kafka專享版實例采用物理隔離的方式部署,租戶獨占Kafka實例。創建Kafka專享版實例之后,使用開源Kafka客戶端向Kafka專享版實例生產消息和消費消息。
本章節介紹如何使用開源的Kafka客戶端訪問未開啟SASL的Kafka專享實例方法。
多語言客戶端使用請參考Kafka官網:
//cwiki.apache.org/confluence/display/KAFKA/Clients
說明:Kafka服務器允許客戶端單IP連接的個數為200個,如果超過了,會出現連接失敗問題。
已配置正確的安全組。
訪問未開啟SASL的Kafka專享實例時,支持VPC內訪問。實例需要配置正確的安全組規則,具體安全組配置要求,請參考下表。
已獲取連接Kafka專享版實例的地址。
使用VPC內訪問,實例端口為9092,實例連接地址獲取如下圖。
獲取VPC內訪問Kafka專享實例的連接地址(實例未開啟SASL)
Kafka專享實例已創建Topic,否則請提前創建Topic。
已下載Kafka命令行工具1.1.0版本或者Kafka命令行工具2.3.0版本,確保Kafka實例版本與命令行工具版本相同。
已在Kafka命令行工具的使用環境中安裝Java Development Kit 1.8.111或以上版本,并完成環境變量配置。
以下操作命令以Linux系統為例進行說明:
解壓Kafka命令行工具。
進入文件壓縮包所在目錄,然后執行以下命令解壓文件。
tar -zxf [kafka_tar]
其中,[kafka_tar]表示命令行工具的壓縮包名稱。
例如:
tar -zxf kafka_2.11-1.1.0.tgz
進入Kafka命令行工具的“/bin”目錄下。
注意,Windows系統下需要進入“/bin/windows”目錄下。
執行如下命令進行生產消息。
./kafka-console-producer.sh --broker-list ${連接地址} --topic ${Topic名稱}
參數說明如下:
連接地址:從前提條件中獲取的連接地址。
Topic名稱:Kafka實例下創建的Topic名稱。
以獲取的Kafka實例連接地址為“10.3.196.45:9092,10.78.42.127:9092,10.4.49.103:9092”為例。執行完命令后輸入內容,按“Enter”發送消息到Kafka實例,輸入的每一行內容都將作為一條消息發送到Kafka實例。
[root@ecs-kafka bin]# ./kafka-console-producer.sh --broker-list 10.3.196.45:9092,10.78.42.127:9092,10.4.49.103:9092 --topic topic-demo
>Hello
>DMS
>Kafka!
>^C[root@ecs-kafka bin]#
如需停止生產使用Ctrl+C命令退出。
執行如下命令消費消息。
./kafka-console-consumer.sh --bootstrap-server ${連接地址} --topic ${Topic名稱} --group ${消費組名稱} --from-beginning
參數說明如下:
連接地址:從前提條件中獲取的連接地址。
Topic名稱:Kafka實例下創建的Topic名稱。
消費組名稱:根據您的業務需求,設定消費組名稱。
示例如下:
[root@ecs-kafka bin]# ./kafka-console-consumer.sh --bootstrap-server 10.3.196.45:9092,10.78.42.127:9092,10.4.49.103:9092 --topic topic-demo --group order-test --from-beginning
Kafka!
DMS
Hello
^CProcessed a total of 3 messages
[root@ecs-kafka bin]#
如需停止消費使用Ctrl+C命令退出。