如何在 SpringBoot 项目中控制 RocketMQ消费线程数量
1 背景
最近在新项目开发中遇到一个有趣的问题,如何在 SpringBoot 项目中控制 RocketMQ 消费线程数量。如何设置单个 topic 消费线程的最小数量和最大数量,用来区分不同 topic 吞吐量不同。
我们先介绍一下 RocketMQ 消息监听再来说明 RocketMQ 消费线程。
2 RocketMQ 消息监听
设置消费者组为 my_consumer_group,监听 TopicTest 队列,并使用并发消息监听器MessageListenerConcurrently
1public class Consumer { 2 3 public static void main(String[] args) throws InterruptedException, MQClientException { 4 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group"); 5 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); 6 consumer.subscribe("TopicTest", "*"); 7 consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); 8 consumer.registerMessageListener(new MessageListenerConcurrently() { 9 @Override 10 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, 11 ConsumeConcurrentlyContext context) { 12 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); 13 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 14 } 15 }); 16 consumer.start(); 17 System.out.printf("Consumer Started.%n"); 18 } 19}
3 RocketMQ 中连接结构图
4 消费监听器
接口:org.apache.rocketmq.client.consumer.listener.MessageListener
有两个子接口:
- 顺序消费:MessageListenerOrderly - 并发消费: MessageListenerConcurrently
4.1 MessageListenerConcurrently
作用:consumer并发消费消息的监听器
比如,在 quick start 中,就是使用的并发消费消息监听器:
1 consumer.registerMessageListener(new MessageListenerConcurrently() { 2 @Override 3 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, 4 ConsumeConcurrentlyContext context) { 5 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); 6 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 7 } 8 });
方法返回值,是个枚举:
1 package org.apache.rocketmq.client.consumer.listener; 2 3/** 4 * 并发消费mq消息结果 5 */ 6public enum ConsumeConcurrentlyStatus { 7 8 /** 9 * Success consumption 10 * 成功消费 11 */ 12 CONSUME_SUCCESS, 13 14 /** 15 * Failure consumption,later try to consume 16 * 失败消费,稍后尝试消费 17 * 18 * 19 * 如果 {@link MessageListener}返回的消费结果为 RECONSUME_LATER,则需要将这些消息发送给Broker延迟消息。 20 * 如果给broker发送消息失败,将延迟5s后提交线程池进行消费。 21 * 22 * RECONSUME_LATER的消息发送入口: MQClientAPIImpl#consumerSendMessageBack, 23 * 命令编码: {@link org.apache.rocketmq.common.protocol.RequestCode#CONSUMER_SEND_MSG_BACK} 24 */ 25 RECONSUME_LATER; 26}
画外音:
当前,我们在具体开发中,肯定不会直接使用这种方式来写consumer。
常用的Consumer实现是:基于 推 的consumer:DefaultMQPushConsumer
4.2 MessageListenerOrderly
作用:consumer顺序消费消息的监听器
5 消费线程池
5.1 DefaultMQPushConsumer
作用:基于 推 的consumer消费者
5.2 注册并发消息监听器
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#registerMessageListener
当使用这个方法注册消息监听器时,实际上会把这个病发消息监听器设置到 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#messageListenerInner属性中。
5.3 设置 consumer 消费 service
可选有两种:
并发消费的service
顺序消费的service
当consumer在启动的时,会使用MessageListener具体实现类型进行判断:
MessageListener 就有并发和顺序两种,所以service也有两种。
1public synchronized void start() throws MQClientException { 2 switch (this.serviceState) { 3 case CREATE_JUST: 4 5 // 省略一部分代码........... 6 7 // 根据注册的监听器类型[并发消息监听器/顺序执行消息监听器],来确定使用哪种消费服务. 8 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { 9 this.consumeOrderly = true; 10 this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); 11 } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { 12 this.consumeOrderly = false; 13 this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); 14 } 15 this.consumeMessageService.start(); 16 17 // 省略一部分代码.......... 18 this.serviceState = ServiceState.RUNNING; 19 break; 20 case RUNNING: 21 case START_FAILED: 22 case SHUTDOWN_ALREADY: 23 throw new MQClientException("The PushConsumer service state not OK, maybe started once"); 24 default: 25 break; 26 } 27 28 // 省略一部分代码.......... 29 }
如果使用的是并发消费的话,使用 ConsumeMessageConcurrentlyService :
在实例化的时候,会创建一个线程池:
1// 无界队列,并且不可配置容量.那 DefaultMQPushConsumer#consumeThreadMax 配置就毫无意义了. 2this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); 3this.consumeExecutor = new ThreadPoolExecutor( 4 this.defaultMQPushConsumer.getConsumeThreadMin(), // 默认20 5 this.defaultMQPushConsumer.getConsumeThreadMax(), // 默认64 6 1000 * 60, 7 TimeUnit.MILLISECONDS, 8 this.consumeRequestQueue, 9 new ThreadFactoryImpl("ConsumeMessageThread_"));
consumer消费线程池参数:
- 默认最小消费线程数 20
- 默认最大消费线程数 64
- keepAliveTime = 60*1000 单位:秒
- 队列:new LinkedBlockingQueue<>() 无界队列
- 线程名称:前缀是:ConsumeMessageThread_
注意:因为线程池使用的是无界队列,那么设置的最大线程数,其实没有什么意义。
5.4 修改线程池线程数
上面我们已经知道了,设置线程池的最大线程数是没什么用的。
那我们其实可以设置线程池的最小线程数,来修改consumer消费消息时的线程池大小。
1public static void main(String[] args) throws InterruptedException, MQClientException { 2 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); 3 4 consumer.setConsumeThreadMin(30); 5 consumer.setConsumeThreadMax(64); 6 7 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); 8 consumer.subscribe("TopicTest", "*"); 9 consumer.registerMessageListener(new MessageListenerConcurrently() { 10 11 @Override 12 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, 13 ConsumeConcurrentlyContext context) { 14 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); 15 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 16 } 17 }); 18 consumer.start(); 19 System.out.printf("Consumer Started.%n"); 20 }
注意:consumeThreadMin 如果大于64,则也需要设置 consumeThreadMax 参数,因为有个校验:
-修改线程池线程数-SpringBoot版
如果consumer是使用spring boot进行集成的,则可以这样设置消费者线程数: