设为首页 加入收藏

TOP

【RocketMQ】【源码】顺序消息实现原理(五)
2023-07-25 21:34:15 】 浏览:106
Tags:RocketMQ 源码
umeExecutor.submit(consumeRequest); } } }

消费时的消息队列锁

ConsumeRequestConsumeMessageOrderlyService的内部类,它有两个成员变量,分别为MessageQueue消息队列和它对应的处理队列ProcessQueue对象。
在run方法中,对消息进行消费,处理逻辑如下:

  1. 判断ProcessQueue是否被删除,如果被删除终止处理;
  2. 调用messageQueueLock的ftchLockObject方法获取消息队列的对象锁,然后使用synchronized进行加锁,这里加锁的原因是因为顺序消费使用的是线程池,可以设置多个线程同时进行消费,所以某个线程在进行消息消费的时候要对消息队列加锁,防止其他线程并发消费,破坏消息的顺序性
  3. 如果是广播模式、或者当前的消息队列已经加锁成功(Locked置为true)并且加锁时间未过期,开始对拉取的消息进行遍历:
  • 如果是集群模式并且消息队列加锁失败,调用tryLockLaterAndReconsume稍后重新进行加锁;
  • 如果是集群模式并且消息队列加锁时间已经过期,调用tryLockLaterAndReconsume稍后重新进行加锁;
  • 如果当前时间距离开始处理的时间超过了最大消费时间,调用submitConsumeRequestLater稍后重新进行处理;
  • 获取批量消费消息个数,从ProcessQueue获取消息内容,如果消息获取不为空,添加消息消费锁,然后调用messageListener的consumeMessage方法进行消息消费;
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
 
   class ConsumeRequest implements Runnable {
        private final ProcessQueue processQueue; // 消息队列对应的处理队列
        private final MessageQueue messageQueue; // 消息队列

        public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
            this.processQueue = processQueue;
            this.messageQueue = messageQueue;
        }

        @Override
        public void run() {
            // 处理队列如果已经被置为删除状态,跳过不进行处理
            if (this.processQueue.isDropped()) {
                log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                return;
            }
            // 获取消息队列的对象锁
            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            // 对象消息队列的对象锁加锁
            synchronized (objLock) {
                // 如果是广播模式、或者当前的消息队列已经加锁成功并且加锁时间未过期
                if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                    final long beginTime = System.currentTimeMillis();
                    for (boolean continueConsume = true; continueConsume; ) {
                        // 判断processQueue是否删除
                        if (this.processQueue.isDropped()) {
                            log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                            break;
                        }
                        // 如果是集群模式并且processQueue的加锁失败
                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                            && !this.processQueue.isLocked()) {
                            log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                            // 稍后进行加锁
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                            break;
                        }
                        // 如果是集群模式并且消息队列加锁时间已经过期
                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                            && this.processQueue.isLockExpired()) {
                            log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
                            // 稍后进行加锁
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                            break;
                        }

                        long interval = System.currentTimeMillis() - beginTime;
                        // 如果当前时间距离开始处理的时间超过了最大消费时间
                        if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                            // 稍后重新进行处理
                            ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                            break;
                        }
                        // 批量消费消息个数
                        final int consumeBatchSize =
                            ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
                        // 获取消息内容
                        List<Messag
首页 上一页 2 3 4 5 6 7 下一页 尾页 5/7/7
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Java面向对象--接口和多态 下一篇java -- static, 内部类, 权限, ..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目