• 热门
  • 基础
  • 数据库
  • 安全
  • 大数据
  • 人工智能
  • 混合云
  • 开发与运维
  • 企业应用

应用服务

行业引擎

全部文档
当前文档

暂无内容

如果没有找到您期望的内容,请尝试其他搜索词

文档中心

公网访问

最近更新时间:2022-07-18 19:16:23

本文介绍使用SDK完成生产、消费

基础环境

绑定公网IP

将集群所有Broker分别绑定弹性IP,具体操作是在云服务控制台绑定弹性IP,在云服务器控制台根据主机名搜索所有节点,设置方法详见更换IP

开放安全组端口

选择创建集群时所选的安全组,并在该安全组内配置外网访问入站的ACL。开放对应客户端外网IP的访问,端口为9092

示例:操作指南

1.在客户端主机上配置/etc/hosts文件 需要将master1上的/etc/hosts文件内容添加至客户端主机上,并将每个ip替换成公网IP地址。

# kafka
120.92.33.214 kmr-0ba43848-gn-73607e20-master-1-001.ksc.com kmr-0ba43848-gn-73607e20-master-1-001
120.92.93.108 kmr-0ba43848-gn-73607e20-core-1-001.ksc.com kmr-0ba43848-gn-73607e20-core-1-001
120.131.13.117 kmr-0ba43848-gn-73607e20-core-1-002.ksc.com kmr-0ba43848-gn-73607e20-core-1-002

2.添加maven依赖。

// 消息队列 Kafka 版服务端版本为 0.10.1.0,建议的客户端版本为 0.10.2.2
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
<version>0.10.2.2</version>
</dependency>

3.客户端添加配置文件。

##配置节点列表,即控制台中集群详情页中,节点配置项所有节点的内网ip列表,如120.92.33.214:6667,120.92.93.108:6667,120.131.13.117:6667
bootstrap.servers=xxxxxxxxxxxxxxxxxxxxx
##配置 Topic,可以在Kafka
topic=alikafka-topic-demo 
##配置 Consumer Group
group.id=group-demo

加载配置文件:

public class JavaKafkaConfigurer {
    private static Properties properties;
    public synchronized static Properties getKafkaProperties() {
       if (null != properties) {
           return properties;
       }
       //获取配置文件 kafka.properties 的内容
       Properties kafkaProperties = new Properties();
       try {
           kafkaProperties.load(KafkaProducerDemo.class.getClassLoader().getResourceAsStream("kafka.properties"));
       } catch (Exception e) {
           // 没加载到文件,程序要考虑退出
           e.printStackTrace();
       }
       properties = kafkaProperties;
       return kafkaProperties;
    }
}
  1. 使用Python SDK生产消息
# 修改 <ip> <port> <topic_name> 即可
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from kafka import KafkaProducer
import json
producer = KafkaProducer(
    bootstrap_servers = ['<ip>:<port>']) #多个ip以逗号隔开

message = "Hello kafka! Message test!"
msg = json.dumps(message).encode()
producer.send('<topic_name>',value = msg)
print("produce message " + message + " success.");
producer.close()

  1. 使用Python SDK消费消息。
# 修改 <ip> <port> <topic_name> <group_id>  即可, <group_id> 根据业务自定义
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    '<topic_name>',
    group_id = "<group_id>",
    bootstrap_servers = ['<ip>:<port>'])  #多个ip以逗号隔开

for message in consumer:
    print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %
    (message.topic, message.partition, message.offset, message.value))

  1. 开启ACL使用Python SDK生产消息
# 修改 <ip> <port> <topic_name> 即可
# 填入 ACL 用户名密码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from kafka import KafkaProducer
import json
producer = KafkaProducer(
    bootstrap_servers = ['<ip>:<port>'], #多个ip以逗号隔开
    security_protocol = "SASL_PLAINTEXT",
    sasl_mechanism = "SCRAM-SHA-256",
    sasl_plain_username = "<username>",
    sasl_plain_password = "<password>",
    )

message = "Hello kafka! Message test!"
msg = json.dumps(message).encode()
producer.send('<topic_name>',value = msg)
print("produce message " + message + " success.");
producer.close()

  1. 开启ACL使用Python SDK消费消息
# 修改 <ip> <port> <topic_name> <group_id>  即可, <group_id> 根据业务自定义
# 填入 ACL 用户名密码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    '<topic_name>',
    group_id = "<group_id>",
    bootstrap_servers = ['<ip>:<port>'], #多个ip以逗号隔开
    security_protocol = "SASL_PLAINTEXT",
    sasl_mechanism = "SCRAM-SHA-256",
    sasl_plain_username = "<username>",
    sasl_plain_password = "<password>",
    )  

for message in consumer:
    print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %
    (message.topic, message.partition, message.offset, message.value))

纯净模式常规模式

纯净模式

点击可全屏预览文档内容

鼠标选中内容,快速反馈问题

如果在文档使用中出现问题,可选中有问题的部分进行快速反馈,我们将跟进处理。
不再提示
好的,我知道了

聆听反馈