最近更新时间:2025-09-28 17:16:12
本文以Python SDK作为示例展示如何使用SDK收发Kafka消息。其他语言可参考Kafka开发指南。
pip install kafka-python
高级功能请参考官方文档。
注意:
同vpc内:在控制台开启内网DNS之后,bootstrap_servers 填内网主机名即可。
例如:bootstrap_servers=['kmr-f688dc0c-gn-7e00d74c-broker-1-1:9092', 'kmr-f688dc0c-gn-7e00d74c-broker-1-2', 'kmr-f688dc0c-gn-7e00d74c-broker-1-3:9092']
不同vpc: bootstrap_servers 填主机名,然后hosts文件添加主机名映射外网eip的hosts。
例如:bootstrap_servers=['kmr-f688dc0c-gn-7e00d74c-broker-1-1:9092', 'kmr-f688dc0c-gn-7e00d74c-broker-1-2:9092', 'kmr-f688dc0c-gn-7e00d74c-broker-1-3:9092']
添加hosts:
例如:
10.69.75.98 kmr-f688dc0c-gn-7e00d74c-broker-1-1
10.69.72.132 kmr-f688dc0c-gn-7e00d74c-broker-1-2
10.69.72.175 kmr-f688dc0c-gn-7e00d74c-broker-1-3# 修改 <ip> <port> <topic_name> 即可
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers = ['<ip>:<port>']) #多个ip以逗号隔开
message = "Hello kafka! Message test!"
msg = json.dumps(message).encode()
producer.send('<topic_name>',value = msg)
print("produce message " + message + " success.");
producer.close()# 修改 <ip> <port> <topic_name> <group_id> 即可, <group_id> 根据业务自定义
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'<topic_name>',
group_id = "<group_id>",
bootstrap_servers = ['<ip>:<port>']) #多个ip以逗号隔开
for message in consumer:
print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %
(message.topic, message.partition, message.offset, message.value))# 修改 <ip> <port> <topic_name> 即可
# 填入 ACL 用户名密码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers = ['<ip>:<port>'], #多个ip以逗号隔开
security_protocol = "SASL_PLAINTEXT",
sasl_mechanism = "SCRAM-SHA-256",
sasl_plain_username = "<username>",
sasl_plain_password = "<password>",
)
message = "Hello kafka! Message test!"
msg = json.dumps(message).encode()
producer.send('<topic_name>',value = msg)
print("produce message " + message + " success.");
producer.close()# 修改 <ip> <port> <topic_name> <group_id> 即可, <group_id> 根据业务自定义
# 填入 ACL 用户名密码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'<topic_name>',
group_id = "<group_id>",
bootstrap_servers = ['<ip>:<port>'], #多个ip以逗号隔开
security_protocol = "SASL_PLAINTEXT",
sasl_mechanism = "SCRAM-SHA-256",
sasl_plain_username = "<username>",
sasl_plain_password = "<password>",
)
for message in consumer:
print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %
(message.topic, message.partition, message.offset, message.value))如需添加未生成的消费者组,可登录主机添加消费者组ACL策略(替换<>对应值):
kafka-acls.sh --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal User:< your_user_name> --allow-host * --operation All --group <your_group>
查看添加结果:
kafka-acls.sh --authorizer-properties zookeeper.connect=127.0.0.1:2181 --list
纯净模式
