Python
更新時間 2024-02-05 11:53:37
最近更新時間: 2024-02-05 11:53:37
分享文章
環境安裝
-
安裝Python。(Python版本為2.7或3.X。)
-
安裝依賴庫。(使用公網連接需要安裝confluent-kafka 1.9.2及以下版本的依賴庫)
pip install confluent-kafka==1.9.2 -
下載Demo包kafka-confluent-python-demo.zip。
配置修改
-
如果是ssl連接,需要在控制臺下載證書。并且解壓壓縮包得到ssl.client.truststore.jks,執行以下命令生成caRoot.pem文件。
keytool -importkeystore -srckeystore ssl.client.truststore.jks -destkeystore caRoot.p12 -deststoretype pkcs12 openssl pkcs12 -in caRoot.p12 -out caRoot.pem -
修改setting.py文件。(ca_location僅在ssl連接時需要配置)
kafka_setting = { 'bootstrap_servers': 'XXX', 'topic_name': 'XXX', 'group_name': 'XXX' }
生產消息
發送以下命令發送消息。
python kafka_producer.py
生產消息示例代碼如下:
from confluent_kafka import Producer
import setting
conf = setting.kafka_setting
"""初始化一個 Producer 對象"""
p = Producer({'bootstrap.servers': conf['bootstrap_servers']})
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
"""異步發送消息"""
p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)
p.poll(0)
"""在程序結束時,調用flush"""
p.flush()
消費消息
發送以下命令消費消息。
python kafka_consumer.py
消費消息示例代碼如下:
from confluent_kafka import Consumer, KafkaError
import setting
conf = setting.kafka_setting
c = Consumer({
'bootstrap.servers': conf['bootstrap_servers'],
'group.id': conf['group_name'],
'auto.offset.reset': 'latest'
})
c.subscribe([conf['topic_name']])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()