全部文档
当前文档

暂无内容

如果没有找到您期望的内容,请尝试其他搜索词

文档中心

Python SDK

最近更新时间:2025-08-18 17:22:24

VPC网络接入

该任务以 Python 客户端为例指导您使用 VPC 网络接入托管 Kafka 并收发消息

前提条件

安装 Python

安装 pip

操作步骤

将下载的 Demo 中的 pythonkafkademo 上传至 Linux 服务器,登录 Linux 服务器,进入 pythonkafkademo 目录。

步骤1:添加 Python 依赖库

执行以下命令安装:

pip install kafka-python

步骤2:生产消息

  1. 修改生产消息程序 producer.py 中配置参数。

#coding:utf8
from kafka import KafkaProducer
import json
producer = KafkaProducer(
   bootstrap_servers = ['$domainName:$port'],
   api_version = (0,10,0)
)
message = "Hello World! Hello kafka!"
msg = json.dumps(message).encode()
producer.send('topic_name',value = msg)
print("produce message " + message + " success.")
producer.close()

参数

描述

bootstrap_servers

接入网络,在控制台的集群详情页面主机列表页复制

topic_name

Topic 名称,您可以在控制台上kafka eagle webui 管理页面复制。

  1. 编译并运行 producer.py。

  1. 查看运行结果。

在 kafka 的 kafka eagle webui 管理页面,选择对应的 Topic,查看刚刚发送的消息。

步骤3:消费消息

  1. 修改消费消息程序 consumer.py 中配置参数。

#coding:utf8
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    '$topic_name',
    group_id = "$group_id",
    bootstrap_servers = ['$domainName:$port'],
    api_version = (0,10,0)
)

for message in consumer:
    print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" % (message.topic, message.partition, message.offset, message.value))

参数

描述

bootstrap_servers

接入网络,在控制台的集群详情页面主机列表页复制

group_id

消费者的组 ID,根据业务需求自定义

topic_name

Topic名称,您可以在控制台上kafka eagle webui 管理页面复制。

  1. 编译并运行 consumer.py。

  2. 查看运行结果。

在Kakfa控制台 的Consumer Group页面,选择对应的消费组名称,在主题名称输入 Topic 名称,单击查询详情,查看消费详情。

公网 SASL_PLAINTEXT 方式接入

该任务以 Python 客户端为例,指导您使用公网 SASL_PLAINTEXT 方式接入Kafka 并收发消息。

前提条件

操作步骤

步骤1:准备工作

  1. 创建接入点。

i.在 集群列表 页面,单击目标集群 ID,进入集群详情页。

ii.在kafka 服务管理页面开启ACL并滚动重启Broker

  1. 用户管理页面新建角色,设置密码。

  2. 在控制台 kafka eagle webui页面新建 Topic

  3. 添加 Python 依赖库

执行以下命令安装

pip install kafka-python

步骤2:添加配置文件

  1. 修改生产消息程序 producer.py 中配置参数。

producer = KafkaProducer(
    bootstrap_servers = ['xx.xx.xx.xx:port'],
    api_version = (1, 1),

    #
    # SASL_PLAINTEXT 公网接入
    #
    security_protocol = "SASL_PLAINTEXT",
    sasl_mechanism = "PLAIN",
    sasl_plain_username = "instanceId#username",
    sasl_plain_password = "password",
)

message = "Hello World! Hello kafka!"
msg = json.dumps(message).encode()
producer.send('topic_name', value = msg)
print("produce message " + message + " success.")
producer.close()

参数

描述

bootstrap_servers

接入网络,在控制台的集群详情页面主机列表页复制

sasl_plain_username

用户名,格式为 用户名。集群 ID 在 Kafka 控制台 的集群详情页面的基本信息获取,用户在用户管理创建用户时设置。

sasl_plain_username

用户密码,在 Kafka 控制台集群详情页面的用户管理创建用户时设置。

topic_name

Topic 名称,您可以在控制台上kafka eagle webui 管理页面复制。

  1. 编译并运行 producer.py。

  2. 查看运行结果。

在 kafka 的 kafka eagle webui 管理页面,选择对应的 Topic , 查看刚刚发送的消息。

步骤3:消费消息

  1. 修改消费消息程序 consumer.py 中配置参数。

consumer = KafkaConsumer(
    'topic_name',
    group_id = "group_id",
    bootstrap_servers = ['xx.xx.xx.xx:port'],
    api_version = (1,1),

    #
    # SASL_PLAINTEXT 公网接入
    #
    security_protocol = "SASL_PLAINTEXT",
    sasl_mechanism = 'PLAIN',
    sasl_plain_username = "instanceId#username",
    sasl_plain_password = "password",
)

for message in consumer:
    print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %
    (message.topic, message.partition, message.offset, message.value))

参数

描述

bootstrap_servers

接入网络,在控制台的集群详情页面主机列表页复制

group_id

消费者的组 ID,根据业务需求自定义。

sasl_plain_username

用户名在Kafka 控制台获取,用户在用户管理创建用户时设置

sasl_plain_password

用户名密码,在 Kafka 控制台集群详情页面的用户管理创建用户时设置

topic_name

Topic 名称,您可以在控制台上 kafka eagle webui 管理页面复制。

  1. 编译并运行 consumer.py。

  2. 查看运行结果。

在 kafka控制台的 Consumer Group 页面,选择对应的消费组名称,在主题名称输入 Topic 名称,单击查询详情,查看消费详情。

文档导读
纯净模式常规模式

纯净模式

点击可全屏预览文档内容
文档反馈