最近更新时间:2022-07-18 19:16:23
本文介绍使用SDK完成生产、消费
将集群所有Broker分别绑定弹性IP,具体操作是在云服务控制台绑定弹性IP,在云服务器控制台根据主机名搜索所有节点,设置方法详见更换IP。
选择创建集群时所选的安全组,并在该安全组内配置外网访问入站的ACL。开放对应客户端外网IP的访问,端口为9092
1.在客户端主机上配置/etc/hosts
文件 需要将master1上的/etc/hosts
文件内容添加至客户端主机上,并将每个ip替换成公网IP地址。
# kafka
120.92.33.214 kmr-0ba43848-gn-73607e20-master-1-001.ksc.com kmr-0ba43848-gn-73607e20-master-1-001
120.92.93.108 kmr-0ba43848-gn-73607e20-core-1-001.ksc.com kmr-0ba43848-gn-73607e20-core-1-001
120.131.13.117 kmr-0ba43848-gn-73607e20-core-1-002.ksc.com kmr-0ba43848-gn-73607e20-core-1-002
2.添加maven依赖。
// 消息队列 Kafka 版服务端版本为 0.10.1.0,建议的客户端版本为 0.10.2.2
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.2</version>
</dependency>
3.客户端添加配置文件。
##配置节点列表,即控制台中集群详情页中,节点配置项所有节点的内网ip列表,如120.92.33.214:6667,120.92.93.108:6667,120.131.13.117:6667
bootstrap.servers=xxxxxxxxxxxxxxxxxxxxx
##配置 Topic,可以在Kafka
topic=alikafka-topic-demo
##配置 Consumer Group
group.id=group-demo
加载配置文件:
public class JavaKafkaConfigurer {
private static Properties properties;
public synchronized static Properties getKafkaProperties() {
if (null != properties) {
return properties;
}
//获取配置文件 kafka.properties 的内容
Properties kafkaProperties = new Properties();
try {
kafkaProperties.load(KafkaProducerDemo.class.getClassLoader().getResourceAsStream("kafka.properties"));
} catch (Exception e) {
// 没加载到文件,程序要考虑退出
e.printStackTrace();
}
properties = kafkaProperties;
return kafkaProperties;
}
}
# 修改 <ip> <port> <topic_name> 即可
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers = ['<ip>:<port>']) #多个ip以逗号隔开
message = "Hello kafka! Message test!"
msg = json.dumps(message).encode()
producer.send('<topic_name>',value = msg)
print("produce message " + message + " success.");
producer.close()
# 修改 <ip> <port> <topic_name> <group_id> 即可, <group_id> 根据业务自定义
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'<topic_name>',
group_id = "<group_id>",
bootstrap_servers = ['<ip>:<port>']) #多个ip以逗号隔开
for message in consumer:
print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %
(message.topic, message.partition, message.offset, message.value))
# 修改 <ip> <port> <topic_name> 即可
# 填入 ACL 用户名密码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers = ['<ip>:<port>'], #多个ip以逗号隔开
security_protocol = "SASL_PLAINTEXT",
sasl_mechanism = "SCRAM-SHA-256",
sasl_plain_username = "<username>",
sasl_plain_password = "<password>",
)
message = "Hello kafka! Message test!"
msg = json.dumps(message).encode()
producer.send('<topic_name>',value = msg)
print("produce message " + message + " success.");
producer.close()
# 修改 <ip> <port> <topic_name> <group_id> 即可, <group_id> 根据业务自定义
# 填入 ACL 用户名密码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'<topic_name>',
group_id = "<group_id>",
bootstrap_servers = ['<ip>:<port>'], #多个ip以逗号隔开
security_protocol = "SASL_PLAINTEXT",
sasl_mechanism = "SCRAM-SHA-256",
sasl_plain_username = "<username>",
sasl_plain_password = "<password>",
)
for message in consumer:
print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %
(message.topic, message.partition, message.offset, message.value))
纯净模式