全部文档
当前文档

暂无内容

如果没有找到您期望的内容,请尝试其他搜索词

文档中心

RocketMQ最佳实践

最近更新时间:2024-10-31 14:57:52

1. 确认端口

在控制台确认远程连接端口,ACL默认关闭

nameserver 端口配置项: rocketmqBrokerNamesrvAddrListenPort 默认为9876

dashboard 端口配置项: rocketmqDashboardServerPort 默认为9958

2. 确认IP

同VPC连接:进入主机列表侧边栏确定RocketMq集群IP, 选择内网IP即可,VPC内无需添加安全组规则

跨vpc连接:需点击主机列表到云主机控制台绑定公网IP(部署了broker所在的节点),并添加安全组9876,9958 (根据配置中心对应修改)入站规则。

EIP 和 安全组 编辑方式如下:

首先点击主机列表中对应 broker 节点的主机名跳转至主机控制台

然后在主机控制台绑定弹性 IP,添加安全组规则,一个集群添加一次规则即可

3. 确认RocketMQ 集群Topic以及消费者组信息

在控制台开启公网访问之后,点击访问RocketMQ UI 可查看 和 管理Topic、消费者组等

WebUI 使用教程:

https://rocketmq.apache.org/zh/docs/4.x/deployment/03Dashboard

3.1 开启 WebUI

如内网访问,可不开启,直接访问 master1内网IP: dashboard 端口即可,端口默认 9958,可以在控制台配置管理中查询修改端口,修改后重启 dashboard 生效,公网访问在控制台 开启WebUI即可

3.2 创建 Topic

3.3 创建 Group

4. RocketMQ ACL

如果需要使用ACL功能,可在控制台开启ACL,开启后需重启组件生效

先创建用户,创建后约 1 分钟实时生效,无需重启 Broker

在编辑权限,编辑后约 1 分钟实时生效,无需重启 Broker

rockermq 权限问题可参考官方文档:

https://rocketmq.apache.org/zh/docs/4.x/bestPractice/04access/

5. 基础示例

基本示例中提供了以下两个功能

  • RocketMQ可用于以三种方式发送消息:可靠的同步、可靠的异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。

  • RocketMQ可以用来消费消息。

5.1 添加依赖

Maven:

<!-- in your <dependencies> block -->
  <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.9.2</version>
  </dependency>
  
<!-- 如开启 acl 则添加以下依赖 -->
  <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-acl</artifactId>
      <version>4.9.2</version>
  </dependency>

5.2 发送消息

如开启了 ACL, 初始化时需要带上2.4小节中填入的用户名密码,示例如下:

// 实例化消息生产者Producer
   DefaultMQProducer producer = new DefaultMQProducer(
       groupName,
       new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL权限
   );
5.2.1使用Producer发送同步消息

可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。

public class SyncProducer {
  public static void main(String[] args) throws Exception {
    // Instantiate with a producer group name
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // Specify name server addresses
    producer.setNamesrvAddr("your nameserver IP:9876");
    // Launch the producer instance
    producer.start();
    for (int i = 0; i < 100; i++) {
      // Create a message instance with specifying topic, tag and message body
      Message msg = new Message("TopicTest" /* Topic */,
        "TagA" /* Tag */,
        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        );
      // Send message to one of brokers
      SendResult sendResult = producer.send(msg);
      // Check whether the message has been delivered by the callback of sendResult
      System.out.printf("%s%n", sendResult);
    }
    // Shut down once the producer instance is not longer in use
    producer.shutdown();
  }
}
5.2.2 发送异步消息

异步传输通常用于响应时间敏感的业务场景。这意味着发送方无法等待代理的响应太长时间。

public class AsyncProducer {
  public static void main(String[] args) throws Exception {
    // Instantiate with a producer group name
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // Specify name server addresses
    producer.setNamesrvAddr("localhost:9876");
    // Launch the producer instance
    producer.start();
    producer.setRetryTimesWhenSendAsyncFailed(0);
    for (int i = 0; i < 100; i++) {
      final int index = i;
      // Create a message instance with specifying topic, tag and message body
      Message msg = new Message("TopicTest",
        "TagA",
        "OrderID188",
        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
      // SendCallback: receive the callback of the asynchronous return result.
      producer.send(msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
          System.out.printf("%-10d OK %s %n", index,
            sendResult.getMsgId());
        }
        @Override
        public void onException(Throwable e) {
          System.out.printf("%-10d Exception %s %n", index, e);
          e.printStackTrace();
        }
      });
    }
    // Shut down once the producer instance is not longer in use
    producer.shutdown();
  }
}
5.2.3 以单向模式发送消息

单向传输用于需要中等可靠性的情况,如日志收集。

public class OnewayProducer {
  public static void main(String[] args) throws Exception{
    // Instantiate with a producer group name
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // Specify name server addresses
    producer.setNamesrvAddr("your nameserver IP:9876");
    // Launch the producer instance
    producer.start();
    for (int i = 0; i < 100; i++) {
      // Create a message instance with specifying topic, tag and message body
      Message msg = new Message("TopicTest" /* Topic */,
        "TagA" /* Tag */,
        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
      );
      // Send in one-way mode, no return result
      producer.sendOneway(msg);
    }
    // Shut down once the producer instance is not longer in use
     producer.shutdown();
  }
}

5.3 消费消息

public class Consumer {
  public static void main(String[] args) throws InterruptedException, MQClientException {
    // Instantiate with specified consumer group name
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
    
    // Specify name server addresses
    consumer.setNamesrvAddr("your nameserver IP:9876");

    // Subscribe one or more topics and tags for finding those messages need to be consumed
    consumer.subscribe("TopicTest", "*");
    // Register callback to execute on arrival of messages fetched from brokers
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        // Mark the message that have been consumed successfully
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
    });
    // Launch the consumer instance
    consumer.start();
    System.out.printf("Consumer Started.%n");
  }
}

文档导读
纯净模式常规模式

纯净模式

点击可全屏预览文档内容
文档反馈