最近更新时间:2024-10-31 14:57:52
在控制台确认远程连接端口,ACL默认关闭
nameserver 端口配置项: rocketmqBrokerNamesrvAddrListenPort 默认为9876
dashboard 端口配置项: rocketmqDashboardServerPort 默认为9958
同VPC连接:进入主机列表侧边栏确定RocketMq集群IP, 选择内网IP即可,VPC内无需添加安全组规则
跨vpc连接:需点击主机列表到云主机控制台绑定公网IP(部署了broker所在的节点),并添加安全组9876,9958 (根据配置中心对应修改)入站规则。
EIP 和 安全组 编辑方式如下:
首先点击主机列表中对应 broker 节点的主机名跳转至主机控制台
然后在主机控制台绑定弹性 IP,添加安全组规则,一个集群添加一次规则即可
在控制台开启公网访问之后,点击访问RocketMQ UI 可查看 和 管理Topic、消费者组等
WebUI 使用教程:
https://rocketmq.apache.org/zh/docs/4.x/deployment/03Dashboard
如内网访问,可不开启,直接访问 master1内网IP: dashboard 端口即可,端口默认 9958,可以在控制台配置管理中查询修改端口,修改后重启 dashboard 生效,公网访问在控制台 开启WebUI即可
如果需要使用ACL功能,可在控制台开启ACL,开启后需重启组件生效
先创建用户,创建后约 1 分钟实时生效,无需重启 Broker
在编辑权限,编辑后约 1 分钟实时生效,无需重启 Broker
rockermq 权限问题可参考官方文档:
https://rocketmq.apache.org/zh/docs/4.x/bestPractice/04access/
基本示例中提供了以下两个功能
RocketMQ可用于以三种方式发送消息:可靠的同步、可靠的异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。
RocketMQ可以用来消费消息。
Maven:
<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.2</version>
</dependency>
<!-- 如开启 acl 则添加以下依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.9.2</version>
</dependency>
如开启了 ACL, 初始化时需要带上2.4小节中填入的用户名密码,示例如下:
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer(
groupName,
new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL权限
);
可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。
public class SyncProducer {
public static void main(String[] args) throws Exception {
// Instantiate with a producer group name
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses
producer.setNamesrvAddr("your nameserver IP:9876");
// Launch the producer instance
producer.start();
for (int i = 0; i < 100; i++) {
// Create a message instance with specifying topic, tag and message body
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// Send message to one of brokers
SendResult sendResult = producer.send(msg);
// Check whether the message has been delivered by the callback of sendResult
System.out.printf("%s%n", sendResult);
}
// Shut down once the producer instance is not longer in use
producer.shutdown();
}
}
异步传输通常用于响应时间敏感的业务场景。这意味着发送方无法等待代理的响应太长时间。
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// Instantiate with a producer group name
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses
producer.setNamesrvAddr("localhost:9876");
// Launch the producer instance
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
// Create a message instance with specifying topic, tag and message body
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback: receive the callback of the asynchronous return result.
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// Shut down once the producer instance is not longer in use
producer.shutdown();
}
}
单向传输用于需要中等可靠性的情况,如日志收集。
public class OnewayProducer {
public static void main(String[] args) throws Exception{
// Instantiate with a producer group name
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses
producer.setNamesrvAddr("your nameserver IP:9876");
// Launch the producer instance
producer.start();
for (int i = 0; i < 100; i++) {
// Create a message instance with specifying topic, tag and message body
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// Send in one-way mode, no return result
producer.sendOneway(msg);
}
// Shut down once the producer instance is not longer in use
producer.shutdown();
}
}
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses
consumer.setNamesrvAddr("your nameserver IP:9876");
// Subscribe one or more topics and tags for finding those messages need to be consumed
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// Mark the message that have been consumed successfully
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// Launch the consumer instance
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
纯净模式