你的位置:首页 > 信息动态 > 新闻中心
信息动态
联系我们

JAVA —— RocketMQ ①

2021-12-5 19:16:20

文章目录

  • 今日内容
  • 1 概述
  • 2.MQ 的作用
  • 3.MQ的缺点
  • 4.常见产品
  • 5.RocketMQ入门
    • 5.1 基础概念
      • 1 消息生产者(Producer)
      • 2 消息消费者(Consumer)
      • 3 拉取式消费(Pull Consumer)
      • 4 推动式消费(Push Consumer)
      • 5 消息(Message)
      • 6 主题(Topic)
      • 7 标签(Tag)
      • 8 代理服务器(Broker Server)
      • 9 名字服务(Name Server)
    • 5.1 JDK
    • 5.2 RocketMQ
  • 6.消息发送
    • 6.1 消息发送(OneToOne)
    • 6.2 广播模式
    • 6.3 发送者发送消息的类型 三种
    • 6.4 延时消息
    • 6.5 批量消息
    • 6.6 Tag过滤消息
    • 6.7 sql过滤消息
    • 6.8 顺序消息
      • 6.8.1 错乱的消息顺序
      • 6.8.2 顺序消息实现
    • 6.9 事务消息
  • ----最佳实践-------扩展资料--------
    • 1 生产者
      • 1.1 发送消息注意事项
        • 1 Tags的使用
        • 2 Keys的使用
        • 3 日志的打印
      • 1.2 消息发送失败处理方式
      • 1.3选择oneway形式发送
    • 2 消费者
      • 2.1 消费过程幂等
      • 2.2 消费速度慢的处理方式
        • 1 提高消费并行度
        • 2 批量方式消费
        • 3 跳过非重要消息
        • 4 优化每条消息消费过程
      • 2.3 消费打印日志
      • 2.4 其他消费建议
        • 1 关于消费者和订阅
        • 2 关于有序消息
        • 3 关于并发消费
        • 4 关于消费状态Consume Status
        • 5 关于Blocking
        • 6 关于线程数设置
        • 7 关于消费位点
    • 3 Broker
      • 3.1 Broker 角色
      • 3.2 FlushDiskType
      • 3.3 Broker 配置
    • 4 NameServer
    • 5 客户端配置
      • 5.1 客户端寻址方式
      • 5.2 客户端配置
        • 1 客户端的公共配置
        • 2 Producer配置
        • 3 PushConsumer配置
        • 4 PullConsumer配置
        • 5 Message数据结构
    • 6 系统配置
      • 6.1 JVM选项
      • 6.2 Linux内核参数


今日内容

  1. MQ简介
  2. 环境搭建
  3. RocketMQ消息

1 概述

MQ(Message Queue)消息队列,是一种用来保存消息数据的队列

队列:数据结构的一种,特征为 “先进先出”

MQ的优势

在这里插入图片描述

在这里插入图片描述

2.MQ 的作用

  • 应用解耦(异步发送消息 )

在这里插入图片描述

在这里插入图片描述

  • 快速应用变更维护

在这里插入图片描述

  • 流量削锋(削峰填谷)
    在这里插入图片描述

3.MQ的缺点

缺点:

  • 系统可用性降低:集群
  • 系统复杂度提高:程序员提升水平
  • 异步消息机制
    • 消息顺序性
    • 消息丢失
    • 消息一致性
    • 消息重复消费

4.常见产品

ActiveMQ:java语言实现,万级数据吞吐量,处理速度ms级,主从架构,成熟度高
RabbitMQ :erlang语言实现,万级数据吞吐量,处理速度us级,主从架构,
RocketMQ :java语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能强大,扩展性强
kafka :scala语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能较少,应用于大数据领域较多

RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术MetaQ,后捐赠给Apache基金会作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目。并且它现在已经在阿里内部被广泛的应用,并且经受住了多次双十一的这种极致场景的压力(2017年的双十一,RocketMQ流转的消息量达到了万亿级,峰值TPS达到5600万)

5.RocketMQ入门

5.1 基础概念

1 消息生产者(Producer)

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。

2 消息消费者(Consumer)

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

3 拉取式消费(Pull Consumer)

Consumer消费的一种类型,应用主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。

4 推动式消费(Push Consumer)

Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。

5 消息(Message)

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。

6 主题(Topic)

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。比如订单类消息、京豆类消息

7 标签(Tag)

为消息设置的标志,用于同一主题下区分不同类型的消息。比如订单类消息又分为:实物类商品和虚拟类(话费)

8 代理服务器(Broker Server)

消息中转角色,负责存储消息、转发消息。负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

9 名字服务(Name Server)

名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。

在这里插入图片描述

工作流程:

​ 1、启动NameServer,NameServer起来后监听端口,等待Broker、Produer、Consumer连上来,相当于一个路由控制中心,也类似zookeeper。

​ 2、Broker启动,跟所有NameServer保持长连接,定时发送心跳包,心跳包中包含当前Broker信息(IP+端口等)以及储存所有topic信息。注册成功后,NameServer集群中就有Topic(tag)跟Broker的映射关系。

​ 3、收发消息前,先创建topic,创建topic时需要指定该topic(tag)要存储在哪些Broker上。也可以在发送消息时自动创建Topic。

​ 4、Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic(tag)存在哪些Broker上,然后跟对应的Broker建长连接,直接向Broker发消息。

​ 5、Consumer跟Producer类似。跟其中一台NameServer建立长连接,获取当前订阅Topic(tag)存在哪些Broker,然后直接跟Broker建立连接通道,开始消费消息。

5.1 JDK

1)解压 jdk
tar -zxvf jdk-8u171-linux-x64.tar.gz
2)配置环境变量
>vim /etc/profile
export JAVA_HOME=/opt/jdk1.8.0_171
export PATH=$PATH:${JAVA_HOME}/bin
3)重新加载配置
>source /etc/profile
>java -version

错误解决

如果安装完毕 jdk 后  java -version 看到的是 openjdk(需要删除)
因为 操作系统默认已经安装了 opendjdk,
# 查看
rpm -qa | grep java
# 删除(把上一个命令看到的所有的jdk文件 用 如下命令删除)
rpm -e --nodeps java-1.8.0-openjdk-1.8.0.232.b09-0.el7_7.x86_64
rpm -e --nodeps java-1.8.0-openjdk-headless-1.8.0.232.b09-0.el7_7.x86_64
rpm -e --nodeps java-1.7.0-openjdk-headless-1.7.0.241-2.6.20.0.el7_7.x86_64
rmp -e --nodeps java-1.7.0-openjdk-1.7.0.241-2.6.20.0.el7_7.x86_64
rpm -e --nodeps java-1.7.0-openjdk-1.7.0.241-2.6.20.0.el7_7.x86_64

在这里插入图片描述

5.2 RocketMQ

# 解压
unzip rocketmq-all-4.5.2-bin-release.zip
# 修改目录名称
mv rocketmq-all-4.5.2-bin-release rocketmq
# 调整启动内存为128m
runserver.sh
runbroker.sh

在这里插入图片描述

如果(docker 一起 需要修改)

conf/broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUS
# 解决和docker 冲突的
brokerIP1=192.168.52.128
namesrvAddr=192.168.52.128:9876

启动

#启动nameserv
sh mqnamesrv
# 启动mq  服务  -n 指定 nameserv 的地址(bin)
systemctl disable firewalld.service 
# 关闭防火墙
systemctl stop firewalld.service 

测试

export NAMESRV_ADDR=localhost:9876

#切换到bin 目录
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer

6.消息发送

消息发送与接收开发流程

1.谁来发?

2.发给谁?

3.怎么发?

4.发什么?

5.发的结果是什么?

6.打扫战场

6.1 消息发送(OneToOne)

单生产者单消费者(OneToOne)

  1. 环境搭建
 <dependency>
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-client</artifactId>
     <version>4.5.2</version>
</dependency>

2.发送消息,消息格式如下所示:

在这里插入图片描述

public static void main(String[] args) throws Exception {
        //1.创建一个发送消息的对象Producer
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2.设定发送的命名服务器地址
        producer.setNamesrvAddr("192.168.31.80:9876");
        //3.1启动发送的服务
        producer.start();
        //4.创建要发送的消息对象,指定topic,指定内容body
        Message msg = new Message("topic1","hello rocketmq".getBytes("UTF-8"));
        //4.2发送消息
        SendResult result = producer.send(msg);
        System.out.println("返回结果:"+result);
        //5.关闭连接
        producer.shutdown();
}

注意:关闭服务器防火墙

#关闭
systemctl stop firewalld.service 
#禁用开机启动
systemctl disable firewalld.service 

//发送多个消息 
for (int i = 1; i <= 10; i++) {
 Message msg = new Message("topic1",("生产者2: hello rocketmq "+i).getBytes("UTF-8"));
            SendResult result = producer.send(msg);
            System.out.println("返回结果:"+result);
}

3.消费者

public static void main(String[] args) throws Exception {
        //1.创建一个接收消息的对象Consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.设定接收的命名服务器地址
        consumer.setNamesrvAddr("192.168.31.80:9876");
        //3.设置接收消息对应的topic,对应的sub标签为任意*
        consumer.subscribe("topic1","*");
        //3.开启监听,用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //遍历消息
                for(MessageExt msg : list){
                    System.out.println("消息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //4.启动接收消息的服务
        consumer.start();// 开启多线程 监控消息,持续运行
        System.out.println("接收消息服务已开启运行");
    }

6.2 广播模式

发送者

同上

消费者

		//1.创建一个接收消息的对象Consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        System.out.println(consumer.getInstanceName());
        //consumer.setInstanceName("instance01");
        //2.设定接收的命名服务器地址
        consumer.setNamesrvAddr("192.168.31.80:9876");
        //3.设置接收消息对应的topic,对应的sub标签为任意*
        consumer.subscribe("topic1","*");

        //设置当前消费者的消费模式(默认模式:负载均衡)
       // consumer.setMessageModel(MessageModel.CLUSTERING);

        //设置当前消费者的消费模式为广播模式:所有客户端接收的消息都是一样的
         consumer.setMessageModel(MessageModel.BROADCASTING);

        //3.开启监听,用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //遍历消息
                for(MessageExt msg : list){
//                  System.out.println("收到消息:"+msg);
                    System.out.println("消费者1:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //4.启动接收消息的服务
        consumer.start();
        System.out.println("接收消息服务已开启运行");

广播模式的现象
  1) 如果 生产者先发送消息, 后启动消费者, 消息只能被消费一次
  2) 如果多个消费者先启动(广播模式),后发消息,才有广播的效果
结论:
必须先启动消费者再启动发送者才有广播的效果

启动多个消费者

在这里插入图片描述

6.3 发送者发送消息的类型 三种

  • 同步消息

    特征:即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)

在这里插入图片描述

SendResult result = producer.send(msg);

  • 异步消息

    特征:即时性较弱,但需要有回执的消息,例如订单中的某些信息

在这里插入图片描述

//回调处理结果必须在生产者进程结束前执行,否则回调无法正确执行
producer.send(msg, new SendCallback() {
    //表示成功返回结果
    public void onSuccess(SendResult sendResult) {
        System.out.println(sendResult);
    }

    //表示发送消息失败
    public void onException(Throwable t) {
        System.out.println(t);
    }
});

  • 单向消息
    特征:不需要有回执的消息,例如日志类消息

在这里插入图片描述

producer.sendOneway(msg);

完整代码如下:

public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("group1");
    producer.setNamesrvAddr("192.168.52.128:9876");
    producer.start();
    for (int i = 1; i <= 5; i++) {
        //同步消息发送
        Message msg = new Message("topic2",
                ("同步消息:hello rocketmq " + i).getBytes("UTF-8"));
        SendResult result = producer.send(msg);
        System.out.println("返回结果:" + result);

        //异步消息发送
        //Message msg = new Message("topic2",
        //        ("异步消息:hello rocketmq " + i).getBytes("UTF-8"));
        //producer.send(msg, new SendCallback() {
        //    //表示成功返回结果
        //    public void onSuccess(SendResult sendResult) {
        //        System.out.println(sendResult);
        //    }
        //
        //    //表示发送消息失败
        //    public void onException(Throwable t) {
        //        System.out.println(t);
        //    }
        //});

        //单向消息
        //Message msg = new Message("topic2", ("单向消息:hello rocketmq " + i).getBytes(
        //        "UTF-8"));
        //producer.sendOneway(msg);
    }
    //添加一个休眠操作,确保异步消息返回后能够输出
    TimeUnit.SECONDS.sleep(10);

    producer.shutdown();
}

6.4 延时消息

延迟消息是指生产者发送消息后,不能立刻被消费者消费,需要等待指定的时间后才可以被消费

应用场景 
在电商系统中,订单创建后,会有一个等待用户支付的时间窗口,一般为30分钟,30分钟后consumer收到这条订单消息,然后程序去订单表中检查当前这条订单的支付状态,如果是未支付的状态,则自动清理掉,这样就不需要使用定时任务的方式去处理了

延时消息格式:

Message msg = new Message("topic3",
        ("延时消息:hello rocketmq "+i).getBytes("UTF-8"));
//设置当前消息的延时效果
msg.setDelayTimeLevel(3);
SendResult result = producer.send(msg);
System.out.println("返回结果:"+result);

目前支持的延迟消息时间

  • 秒级:1,5,10,30

  • 分级:1~10,20,30

  • 时级:1,2

  • 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

    分别代表延迟level1-level18

public static void main(String[] args) throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    consumer.setNamesrvAddr("192.168.52.128:9876");
    consumer.subscribe("topic3", "*");

    consumer.registerMessageListener(new MessageListenerConcurrently() {
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
                                                        ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            for (MessageExt msg : list) {
                System.out.println("消息:" + new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.out.println("接收消息服务已开启运行");
}

6.5 批量消息

发送批量消息

List<Message> msgList = new ArrayList<Message>();
SendResult send = producer.send(msgList);

注意:

消息内容总长度不超过4M
 消息内容总长度包含如下:
    topic(字符串字节数)
    body (字节数组长度)
    消息追加的属性(key与value对应字符串字节数)
    日志(固定20字节)

6.6 Tag过滤消息

生产者

public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.31.80:9876");
        producer.start();

        //创建消息的时候除了制定topic,还可以指定tag
        Message msg = new Message("topic6","tag2",("消息过滤按照tag:hello rocketmq 											2").getBytes("UTF-8"));

        SendResult send = producer.send(msg);
        System.out.println(send);

        producer.shutdown();
    }

消费者

*代表任意tag

“tag1 || tag2” 代表两个 tag 那个都行

//接收消息的时候,除了制定topic,还可以指定接收的tag,*代表任意tag
consumer.subscribe("topic6","tag1 || tag2");
//consumer.subscribe("topic6","*");

6.7 sql过滤消息

也可以叫做属性过滤/语法过滤/SQL过滤

生产者

Message msg = new Message("topic7",("消息过滤按照sql:hello rocketmq").getBytes("UTF-8"));
//为消息添加属性
msg.putUserProperty("vip","1");
msg.putUserProperty("age","20");

消费者

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.52.128:9876");
//使用消息选择器来过滤对应的属性,语法格式为类SQL语法
consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));

注意:SQL过滤需要依赖服务器的功能支持,在broker配置文件中添加对应的功能项,并开启对应功能

enablePropertyFilter=true

启动服务器

sh mqbroker -n localhost:9876 -c ../conf/broker.conf

6.8 顺序消息

6.8.1 错乱的消息顺序

默认情况下,MQ 开启了多个队列, 同时发送多个消息的的话,发送给那个队列是不确定的,同时消息的消费者读取消息,每读取一个消息开启一个线程,也不能保证消息的顺序性,

在这里插入图片描述

6.8.2 顺序消息实现

想要保证消息的有序性,需要指定消息的队列,同时消息的消费者应该一个队列开启一个线程进行接收而不是一个消息一个线程。实现方式如下:

发送者

   public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.31.80:9876");
        producer.start();

        //创建要执行的业务队列
        List<Order> orderList = new ArrayList<Order>();

        Order order11 = new Order();
        order11.setId("a");
        order11.setMsg("主单-1");
        orderList.add(order11);

        Order order12 = new Order();
        order12.setId("a");
        order12.setMsg("子单-2");
        orderList.add(order12);

        Order order13 = new Order();
        order13.setId("a");
        order13.setMsg("支付-3");
        orderList.add(order13);

        Order order14 = new Order();
        order14.setId("a");
        order14.setMsg("推送-4");
        orderList.add(order14);

        Order order21 = new Order();
        order21.setId("b");
        order21.setMsg("主单-1");
        orderList.add(order21);

        Order order22 = new Order();
        order22.setId("b");
        order22.setMsg("子单-2");
        orderList.add(order22);

        Order order31 = new Order();
        order31.setId("c");
        order31.setMsg("主单-1");
        orderList.add(order31);

        Order order32 = new Order();
        order32.setId("c");
        order32.setMsg("子单-2");
        orderList.add(order32);

        Order order33 = new Order();
        order33.setId("c");
        order33.setMsg("支付-3");
        orderList.add(order33);

        //设置消息进入到指定的消息队列中
        for(final Order order : orderList){
            Message msg = new Message("orderTopic",order.toString().getBytes());
            //发送时要指定对应的消息队列选择器
            SendResult result = producer.send(msg, new MessageQueueSelector() {
                //设置当前消息发送时使用哪一个消息队列
                public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                    System.out.println(list.size());// 数量只能通过修改 mq 的配置 改变(阿里开发团队认为,这个是敏感资源需要服务器管理员控制,而不是编码人员控制)
                    //根据发送的信息不同,选择不同的消息队列
                    //根据id来选择一个消息队列的对象,并返回->id得到int值
                    int mqIndex = order.getId().hashCode() % list.size();
                    return list.get(mqIndex);
                }
            }, null);
            System.out.println(result);
        }

        producer.shutdown();
    }

接受消息

public static void main(String[] args) throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    consumer.setNamesrvAddr("192.168.31.80:9876");
    consumer.subscribe("orderTopic","*");

    //使用单线程的模式从消息队列中取数据,一个线程绑定一个消息队列
    consumer.registerMessageListener(new MessageListenerOrderly() {
        //使用MessageListenerOrderly接口后,对消息队列的处理由一个消息队列多个线程服务,转化为一个消息队列一个线程服务
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
            for(MessageExt msg : list){
                System.out.println(Thread.currentThread().getName()+"  消息:"+new String(msg.getBody()));
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });
    consumer.start();
    System.out.println("接收消息服务已开启运行");
}

6.9 事务消息

RocketMQ 也允许像mysql 一样发送具有事务特征的消息

MQ 的事务流程(本地代码正常执行)

在这里插入图片描述

MQ 的消息补偿过程(当本地代码执行失败时)

在这里插入图片描述

MQ 消息的三种状态

提交状态:允许进入队列,此消息与非事务消息无区别
回滚状态:不允许进入队列,此消息等同于未发送过
中间状态:完成了 half 消息的发送,未对 MQ 进行二次状态确认(未知状态)

注意:事务消息仅与生产者有关,与消费者无关

生产者代码代码

 public static void main1(String[] args) throws Exception {
        //事务消息使用的生产者是TransactionMQProducer
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        producer.setNamesrvAddr("192.168.184.128:9876");
        //添加本地事务对应的监听
        producer.setTransactionListener(new TransactionListener() {
            //正常事务过程
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
               
               // 此处写本地事务处理业务
               // 如果成功,消息改为提交,如果失败改为 回滚,如果是多线程处理状态未知,就提交为未知等待事务补偿过程
               
               //事务提交状态
                return LocalTransactionState.COMMIT_MESSAGE;// 类似于msql 的 commit
            }
            //事务补偿过程
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                return null;
            }
        });
        producer.start();

        Message msg = new Message("topic8",("事务消息:hello rocketmq ").getBytes("UTF-8"));
        SendResult result = producer.sendMessageInTransaction(msg,null);
        System.out.println("返回结果:"+result);
        producer.shutdown();
    }

补偿代码

 public static void main(String[] args) throws Exception {
        //事务消息使用的生产者是TransactionMQProducer
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        producer.setNamesrvAddr("192.168.184.128:9876");
        //添加本地事务对应的监听
        producer.setTransactionListener(new TransactionListener() {
            //正常事务过程
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                //中间状态
                return LocalTransactionState.UNKNOW;
            }
            //事务补偿过程
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("事务补偿过程执行");
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        producer.start();

        Message msg = new Message("topic11",("事务消息:hello rocketmq ").getBytes("UTF-8"));
        SendResult result = producer.sendMessageInTransaction(msg,null);
        System.out.println("返回结果:"+result);
        //事务补偿过程必须保障服务器在运行过程中,否则将无法进行正常的事务补偿
//      producer.shutdown();
    }

在这里插入图片描述

----最佳实践-------扩展资料--------


1 生产者

1.1 发送消息注意事项

1 Tags的使用

一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:message.setTags(“TagA”)。

2 Keys的使用

每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。

   // 订单Id   
   String orderId = "20034568923546";   
   message.setKeys(orderId);   

3 日志的打印

消息发送成功或者失败要打印消息日志,务必要打印SendResult和key字段。send消息方法只要不抛异常,就代表发送成功。发送成功会有多个状态,在sendResult里定义。以下对每个状态进行说明:

  • SEND_OK

消息发送成功。要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。

  • FLUSH_DISK_TIMEOUT

消息发送成功但是服务器刷盘超时。此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失。消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度,如果Broker服务器设置了刷盘方式为同步刷盘,即FlushDiskType=SYNC_FLUSH(默认为异步刷盘方式),当Broker服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回该状态——刷盘超时。

  • FLUSH_SLAVE_TIMEOUT

消息发送成功,但是服务器同步到Slave时超时。此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master即ASYNC_MASTER),并且从Broker服务器未在同步刷盘时间(默认为5秒)内完成与主服务器的同步,则将返回该状态——数据同步到Slave服务器超时。

  • SLAVE_NOT_AVAILABLE

消息发送成功,但是此时Slave不可用。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master服务器即ASYNC_MASTER),但没有配置slave Broker服务器,则将返回该状态——无Slave服务器可用。

1.2 消息发送失败处理方式

Producer的send方法本身支持内部重试,重试逻辑如下:

  • 至多重试2次(同步发送为2次,异步发送为0次)。
  • 如果发送失败,则轮转到下一个Broker。这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。
  • 如果本身向broker发送消息产生超时异常,就不会再重试。

以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用send同步方法发送失败时,则尝试将消息存储到db,然后由后台线程定时重试,确保消息一定到达Broker。

上述db重试方式为什么没有集成到MQ客户端内部做,而是要求应用自己去完成,主要基于以下几点考虑:首先,MQ的客户端设计为无状态模式,方便任意的水平扩展,且对机器资源的消耗仅仅是cpu、内存、网络。其次,如果MQ客户端内部集成一个KV存储模块,那么数据只有同步落盘才能较可靠,而同步落盘本身性能开销较大,所以通常会采用异步落盘,又由于应用关闭过程不受MQ运维人员控制,可能经常会发生 kill -9 这样暴力方式关闭,造成数据没有及时落盘而丢失。第三,Producer所在机器的可靠性较低,一般为虚拟机,不适合存储重要数据。综上,建议重试过程交由应用来控制。

1.3选择oneway形式发送

通常消息的发送是这样一个过程:

  • 客户端发送请求到服务器
  • 服务器处理请求
  • 服务器向客户端返回应答

所以,一次消息发送的耗时时间是上述三个步骤的总和,而某些场景要求耗时非常短,但是对可靠性要求并不高,例如日志收集类应用,此类应用可以采用oneway形式调用,oneway形式只发送请求不等待应答,而发送请求在客户端实现层面仅仅是一个操作系统系统调用的开销,即将数据写入客户端的socket缓冲区,此过程耗时通常在微秒级。

2 消费者

2.1 消费过程幂等

RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)

msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。

2.2 消费速度慢的处理方式

1 提高消费并行度

绝大部分消息消费行为都属于 IO 密集型,即可能是操作数据库,或者调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降。所以,应用必须要设置合理的并行度。 如下有几种修改消费并行度的方法:

  • 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效)。可以通过加机器,或者在已有机器启动多个进程的方式。
  • 提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、consumeThreadMax实现。

2 批量方式消费

某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 返个参数,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。

3 跳过非重要消息

发生消息堆积时,如果消费速度一直追不上发送速度,如果业务对数据要求不高的话,可以选择丢弃不重要的消息。例如,当某个队列的消息数堆积到100000条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度。示例代码如下:

    public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
        long offset = msgs.get(0).getQueueOffset();
        String maxOffset =
                msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
        long diff = Long.parseLong(maxOffset) - offset;
        if (diff > 100000) {
            // TODO 消息堆积情况的特殊处理
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        // TODO 正常消费过程
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }    

4 优化每条消息消费过程

举例如下,某条消息的消费过程如下:

  • 根据消息从 DB 查询【数据 1】
  • 根据消息从 DB 查询【数据 2】
  • 复杂的业务计算
  • 向 DB 插入【数据 3】
  • 向 DB 插入【数据 4】

这条消息的消费过程中有4次与 DB的 交互,如果按照每次 5ms 计算,那么总共耗时 20ms,假设业务计算耗时 5ms,那么总过耗时 25ms,所以如果能把 4 次 DB 交互优化为 2 次,那么总耗时就可以优化到 15ms,即总体性能提高了 40%。所以应用如果对时延敏感的话,可以把DB部署在SSD硬盘,相比于SCSI磁盘,前者的RT会小很多。

2.3 消费打印日志

如果消息量较少,建议在消费入口方法打印消息,消费耗时等,方便后续排查问题。

   public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
        log.info("RECEIVE_MSG_BEGIN: " + msgs.toString());
        // TODO 正常消费过程
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }   

如果能打印每条消息消费耗时,那么在排查消费慢等线上问题时,会更方便。

2.4 其他消费建议

1 关于消费者和订阅

第一件需要注意的事情是,不同的消费者组可以独立的消费一些 topic,并且每个消费者组都有自己的消费偏移量,请确保同一组内的每个消费者订阅信息保持一致。

2 关于有序消息

消费者将锁定每个消息队列,以确保他们被逐个消费,虽然这将会导致性能下降,但是当你关心消息顺序的时候会很有用。不建议抛出异常,你可以返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 作为替代。

3 关于并发消费

顾名思义,消费者将并发消费这些消息,建议你使用它来获得良好性能,不建议抛出异常,你可以返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 作为替代。

4 关于消费状态Consume Status

对于并发的消费监听器,你可以返回 RECONSUME_LATER 来通知消费者现在不能消费这条消息,并且希望可以稍后重新消费它。然后,你可以继续消费其他消息。对于有序的消息监听器,因为你关心它的顺序,所以不能跳过消息,但是你可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT 告诉消费者等待片刻。

5 关于Blocking

不建议阻塞监听器,因为它会阻塞线程池,并最终可能会终止消费进程

6 关于线程数设置

消费者使用 ThreadPoolExecutor 在内部对消息进行消费,所以你可以通过设置 setConsumeThreadMin 或 setConsumeThreadMax 来改变它。

7 关于消费位点

当建立一个新的消费者组时,需要决定是否需要消费已经存在于 Broker 中的历史消息CONSUME_FROM_LAST_OFFSET 将会忽略历史消息,并消费之后生成的任何消息。CONSUME_FROM_FIRST_OFFSET 将会消费每个存在于 Broker 中的信息。你也可以使用 CONSUME_FROM_TIMESTAMP 来消费在指定时间戳后产生的消息。

3 Broker

3.1 Broker 角色

Broker 角色分为 ASYNC_MASTER(异步主机)、SYNC_MASTER(同步主机)以及SLAVE(从机)。如果对消息的可靠性要求比较严格,可以采用 SYNC_MASTER加SLAVE的部署方式。如果对消息可靠性要求不高,可以采用ASYNC_MASTER加SLAVE的部署方式。如果只是测试方便,则可以选择仅ASYNC_MASTER或仅SYNC_MASTER的部署方式。

3.2 FlushDiskType

SYNC_FLUSH(同步刷新)相比于ASYNC_FLUSH(异步处理)会损失很多性能,但是也更可靠,所以需要根据实际的业务场景做好权衡。

3.3 Broker 配置

参数名默认值说明
listenPort10911接受客户端连接的监听端口
namesrvAddrnullnameServer 地址
brokerIP1网卡的 InetAddress当前 broker 监听的 IP
brokerIP2跟 brokerIP1 一样存在主从 broker 时,如果在 broker 主节点上配置了 brokerIP2 属性,broker 从节点会连接主节点配置的 brokerIP2 进行同步
brokerNamenullbroker 的名称
brokerClusterNameDefaultCluster本 broker 所属的 Cluser 名称
brokerId0broker id, 0 表示 master, 其他的正整数表示 slave
storePathCommitLog$HOME/store/commitlog/存储 commit log 的路径
storePathConsumerQueue$HOME/store/consumequeue/存储 consume queue 的路径
mappedFileSizeCommitLog1024 * 1024 * 1024(1G)commit log 的映射文件大小
deleteWhen04在每天的什么时间删除已经超过文件保留时间的 commit log
fileReservedTime72以小时计算的文件保留时间
brokerRoleASYNC_MASTERSYNC_MASTER/ASYNC_MASTER/SLAVE
flushDiskTypeASYNC_FLUSHSYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 broker 保证在收到确认生产者之前将消息刷盘。ASYNC_FLUSH 模式下的 broker 则利用刷盘一组消息的模式,可以取得更好的性能。

4 NameServer

RocketMQ 中,Name Servers 被设计用来做简单的路由管理。其职责包括:

  • Brokers 定期向每个名称服务器注册路由数据。
  • 名称服务器为客户端,包括生产者,消费者和命令行客户端提供最新的路由信息。

5 客户端配置

相对于RocketMQ的Broker集群,生产者和消费者都是客户端。本小节主要描述生产者和消费者公共的行为配置。

5.1 客户端寻址方式

RocketMQ可以令客户端找到Name Server, 然后通过Name Server再找到Broker。如下所示有多种配置方式,优先级由高到低,高优先级会覆盖低优先级。

  • 代码中指定Name Server地址,多个namesrv地址之间用分号分割
producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");  

consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");

  • Java启动参数中指定Name Server地址
-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876  

  • 环境变量指定Name Server地址
export   NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876   

  • HTTP静态服务器寻址(默认)

客户端启动后,会定时访问一个静态HTTP服务器,地址如下:http://jmenv.tbsite.net:8080/rocketmq/nsaddr,这个URL的返回内容如下:

192.168.0.1:9876;192.168.0.2:9876   

客户端默认每隔2分钟访问一次这个HTTP服务器,并更新本地的Name Server地址。URL已经在代码中硬编码,可通过修改/etc/hosts文件来改变要访问的服务器,例如在/etc/hosts增加如下配置:

10.232.22.67    jmenv.taobao.net   

推荐使用HTTP静态服务器寻址方式,好处是客户端部署简单,且Name Server集群可以热升级。

5.2 客户端配置

DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPullConsumer都继承于ClientConfig类,ClientConfig为客户端的公共配置类。客户端的配置都是get、set形式,每个参数都可以用spring来配置,也可以在代码中配置,例如namesrvAddr这个参数可以这样配置,producer.setNamesrvAddr(“192.168.0.1:9876”),其他参数同理。

1 客户端的公共配置

参数名默认值说明
namesrvAddrName Server地址列表,多个NameServer地址用分号隔开
clientIP本机IP客户端本机IP地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定
instanceNameDEFAULT客户端实例名称,客户端创建的多个Producer、Consumer实际是共用一个内部实例(这个实例包含网络连接、线程资源等)
clientCallbackExecutorThreads4通信层异步回调线程数
pollNameServerInteval30000轮询Name Server间隔时间,单位毫秒
heartbeatBrokerInterval30000向Broker发送心跳间隔时间,单位毫秒
persistConsumerOffsetInterval5000持久化Consumer消费进度间隔时间,单位毫秒

2 Producer配置

参数名默认值说明
producerGroupDEFAULT_PRODUCERProducer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组
createTopicKeyTBW102在发送消息时,自动创建服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由。
defaultTopicQueueNums4在发送消息,自动创建服务器不存在的topic时,默认创建的队列数
sendMsgTimeout10000发送消息超时时间,单位毫秒
compressMsgBodyOverHowmuch4096消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节
retryAnotherBrokerWhenNotStoreOKFALSE如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送
retryTimesWhenSendFailed2如果消息发送失败,最大重试次数,该参数只对同步发送模式起作用
maxMessageSize4MB客户端限制的消息大小,超过报错,同时服务端也会限制,所以需要跟服务端配合使用。
transactionCheckListener事务消息回查监听器,如果发送事务消息,必须设置
checkThreadPoolMinSize1Broker回查Producer事务状态时,线程池最小线程数
checkThreadPoolMaxSize1Broker回查Producer事务状态时,线程池最大线程数
checkRequestHoldMax2000Broker回查Producer事务状态时,Producer本地缓冲请求队列大小
RPCHooknull该参数是在Producer创建时传入的,包含消息发送前的预处理和消息响应后的处理两个接口,用户可以在第一个接口中做一些安全控制或者其他操作。

3 PushConsumer配置

参数名默认值说明
consumerGroupDEFAULT_CONSUMERConsumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组
messageModelCLUSTERING消费模型支持集群消费和广播消费两种
consumeFromWhereCONSUME_FROM_LAST_OFFSETConsumer启动后,默认从上次消费的位置开始消费,这包含两种情况:一种是上次消费的位置未过期,则消费从上次中止的位置进行;一种是上次消费位置已经过期,则从当前队列第一条消息开始消费
consumeTimestamp半个小时前只有当consumeFromWhere值为CONSUME_FROM_TIMESTAMP时才起作用。
allocateMessageQueueStrategyAllocateMessageQueueAveragelyRebalance算法实现策略
subscription订阅关系
messageListener消息监听器
offsetStore消费进度存储
consumeThreadMin10消费线程池最小线程数
consumeThreadMax20消费线程池最大线程数
consumeConcurrentlyMaxSpan2000单队列并行消费允许的最大跨度
pullThresholdForQueue1000拉消息本地队列缓存消息最大数
pullInterval0拉消息间隔,由于是长轮询,所以为0,但是如果应用为了流控,也可以设置大于0的值,单位毫秒
consumeMessageBatchMaxSize1批量消费,一次消费多少条消息
pullBatchSize32批量拉消息,一次最多拉多少条

4 PullConsumer配置

参数名默认值说明
consumerGroupDEFAULT_CONSUMERConsumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组
brokerSuspendMaxTimeMillis20000长轮询,Consumer拉消息请求在Broker挂起最长时间,单位毫秒
consumerTimeoutMillisWhenSuspend30000长轮询,Consumer拉消息请求在Broker挂起超过指定时间,客户端认为超时,单位毫秒
consumerPullTimeoutMillis10000非长轮询,拉消息超时时间,单位毫秒
messageModelBROADCASTING消息支持两种模式:集群消费和广播消费
messageQueueListener监听队列变化
offsetStore消费进度存储
registerTopics注册的topic集合
allocateMessageQueueStrategyAllocateMessageQueueAveragelyRebalance算法实现策略

5 Message数据结构

字段名默认值说明
Topicnull必填,消息所属topic的名称
Bodynull必填,消息体
Tagsnull选填,消息标签,方便服务器过滤使用。目前只支持每个消息设置一个tag
Keysnull选填,代表这条消息的业务关键词,服务器会根据keys创建哈希索引,设置后,可以在Console系统根据Topic、Keys来查询消息,由于是哈希索引,请尽可能保证key唯一,例如订单号,商品Id等。
Flag0选填,完全由应用来设置,RocketMQ不做干预
DelayTimeLevel0选填,消息延时级别,0表示不延时,大于0会延时特定的时间才会被消费
WaitStoreMsgOKTRUE选填,表示消息是否在服务器落盘后才返回应答。

6 系统配置

本小节主要介绍系统(JVM/OS)相关的配置。

6.1 JVM选项

推荐使用最新发布的JDK 1.8版本。通过设置相同的Xms和Xmx值来防止JVM调整堆大小以获得更好的性能。简单的JVM配置如下所示:

​ ​-server -Xms8g -Xmx8g -Xmn4g ​


如果您不关心RocketMQ Broker的启动时间,还有一种更好的选择,就是通过“预触摸”Java堆以确保在JVM初始化期间每个页面都将被分配。那些不关心启动时间的人可以启用它:
​ -XX:+AlwaysPreTouch
禁用偏置锁定可能会减少JVM暂停,
​ -XX:-UseBiasedLocking
至于垃圾回收,建议使用带JDK 1.8的G1收集器。

-XX:+UseG1GC -XX:G1HeapRegionSize=16m   
-XX:G1ReservePercent=25 
-XX:InitiatingHeapOccupancyPercent=30

这些GC选项看起来有点激进,但事实证明它在的生产环境中具有良好的性能。另外不要把-XX:MaxGCPauseMillis的值设置太小,否则JVM将使用一个小的年轻代来实现这个目标,这将导致非常频繁的minor GC,所以建议使用rolling GC日志文件:

-XX:+UseGCLogFileRotation   
-XX:NumberOfGCLogFiles=5 
-XX:GCLogFileSize=30m

如果写入GC文件会增加代理的延迟,可以考虑将GC日志文件重定向到内存文件系统:

-Xloggc:/dev/shm/mq_gc_%p.log123   

6.2 Linux内核参数

os.sh脚本在bin文件夹中列出了许多内核参数,可以进行微小的更改然后用于生产用途。下面的参数需要注意,更多细节请参考/proc/sys/vm/*的文档

  • vm.extra_free_kbytes,告诉VM在后台回收(kswapd)启动的阈值与直接回收(通过分配进程)的阈值之间保留额外的可用内存。RocketMQ使用此参数来避免内存分配中的长延迟。(与具体内核版本相关)
  • vm.min_free_kbytes,如果将其设置为低于1024KB,将会巧妙的将系统破坏,并且系统在高负载下容易出现死锁。
  • vm.max_map_count,限制一个进程可能具有的最大内存映射区域数。RocketMQ将使用mmap加载CommitLog和ConsumeQueue,因此建议将为此参数设置较大的值。(agressiveness --> aggressiveness)
  • vm.swappiness,定义内核交换内存页面的积极程度。较高的值会增加攻击性,较低的值会减少交换量。建议将值设置为10来避免交换延迟。
  • File descriptor limits,RocketMQ需要为文件(CommitLog和ConsumeQueue)和网络连接打开文件描述符。建议设置文件描述符的值为655350。
  • Disk scheduler,RocketMQ建议使用I/O截止时间调度器,它试图为请求提供有保证的延迟。