全部文档
当前文档

暂无内容

如果没有找到您期望的内容,请尝试其他搜索词

文档中心

使用SDK收发消息

最近更新时间:2025-09-28 17:16:12

本文以Python SDK作为示例展示如何使用SDK收发Kafka消息。其他语言可参考Kafka开发指南。


基础环境

pip install kafka-python

高级功能请参考官方文档

注意:
同vpc内:在控制台开启内网DNS之后,bootstrap_servers 填内网主机名即可。
例如:bootstrap_servers=['kmr-f688dc0c-gn-7e00d74c-broker-1-1:9092', 'kmr-f688dc0c-gn-7e00d74c-broker-1-2', 'kmr-f688dc0c-gn-7e00d74c-broker-1-3:9092']
不同vpc:   bootstrap_servers 填主机名,然后hosts文件添加主机名映射外网eip的hosts。
例如:bootstrap_servers=['kmr-f688dc0c-gn-7e00d74c-broker-1-1:9092', 'kmr-f688dc0c-gn-7e00d74c-broker-1-2:9092', 'kmr-f688dc0c-gn-7e00d74c-broker-1-3:9092']
添加hosts: 
例如:
10.69.75.98   kmr-f688dc0c-gn-7e00d74c-broker-1-1
10.69.72.132  kmr-f688dc0c-gn-7e00d74c-broker-1-2
10.69.72.175  kmr-f688dc0c-gn-7e00d74c-broker-1-3

简单生产示例

# 修改 <ip> <port> <topic_name> 即可
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from kafka import KafkaProducer
import json
producer = KafkaProducer(
    bootstrap_servers = ['<ip>:<port>']) #多个ip以逗号隔开

message = "Hello kafka! Message test!"
msg = json.dumps(message).encode()
producer.send('<topic_name>',value = msg)
print("produce message " + message + " success.");
producer.close()

简单消费示例

# 修改 <ip> <port> <topic_name> <group_id>  即可, <group_id> 根据业务自定义
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    '<topic_name>',
    group_id = "<group_id>",
    bootstrap_servers = ['<ip>:<port>'])  #多个ip以逗号隔开

for message in consumer:
    print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %
    (message.topic, message.partition, message.offset, message.value))

ACL 生产消费示例

# 修改 <ip> <port> <topic_name> 即可
# 填入 ACL 用户名密码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from kafka import KafkaProducer
import json
producer = KafkaProducer(
    bootstrap_servers = ['<ip>:<port>'], #多个ip以逗号隔开
    security_protocol = "SASL_PLAINTEXT",
    sasl_mechanism = "SCRAM-SHA-256",
    sasl_plain_username = "<username>",
    sasl_plain_password = "<password>",
    )

message = "Hello kafka! Message test!"
msg = json.dumps(message).encode()
producer.send('<topic_name>',value = msg)
print("produce message " + message + " success.");
producer.close()

# 修改 <ip> <port> <topic_name> <group_id>  即可, <group_id> 根据业务自定义
# 填入 ACL 用户名密码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    '<topic_name>',
    group_id = "<group_id>",
    bootstrap_servers = ['<ip>:<port>'], #多个ip以逗号隔开
    security_protocol = "SASL_PLAINTEXT",
    sasl_mechanism = "SCRAM-SHA-256",
    sasl_plain_username = "<username>",
    sasl_plain_password = "<password>",
    )  

for message in consumer:
    print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %
    (message.topic, message.partition, message.offset, message.value))

如需添加未生成的消费者组,可登录主机添加消费者组ACL策略(替换<>对应值):

kafka-acls.sh --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal User:< your_user_name> --allow-host * --operation All --group <your_group>

查看添加结果:

kafka-acls.sh --authorizer-properties zookeeper.connect=127.0.0.1:2181 --list

文档导读
纯净模式常规模式

纯净模式

点击可全屏预览文档内容
文档反馈