umeExecutor.submit(consumeRequest);
}
}
}
消费时的消息队列锁
ConsumeRequest
是ConsumeMessageOrderlyService
的内部类,它有两个成员变量,分别为MessageQueue
消息队列和它对应的处理队列ProcessQueue
对象。
在run方法中,对消息进行消费,处理逻辑如下:
- 判断
ProcessQueue
是否被删除,如果被删除终止处理;
- 调用messageQueueLock的ftchLockObject方法获取消息队列的对象锁,然后使用synchronized进行加锁,这里加锁的原因是因为顺序消费使用的是线程池,可以设置多个线程同时进行消费,所以某个线程在进行消息消费的时候要对消息队列加锁,防止其他线程并发消费,破坏消息的顺序性;
- 如果是广播模式、或者当前的消息队列已经加锁成功(Locked置为true)并且加锁时间未过期,开始对拉取的消息进行遍历:
- 如果是集群模式并且消息队列加锁失败,调用tryLockLaterAndReconsume稍后重新进行加锁;
- 如果是集群模式并且消息队列加锁时间已经过期,调用tryLockLaterAndReconsume稍后重新进行加锁;
- 如果当前时间距离开始处理的时间超过了最大消费时间,调用submitConsumeRequestLater稍后重新进行处理;
- 获取批量消费消息个数,从ProcessQueue获取消息内容,如果消息获取不为空,添加消息消费锁,然后调用messageListener的consumeMessage方法进行消息消费;
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
class ConsumeRequest implements Runnable {
private final ProcessQueue processQueue; // 消息队列对应的处理队列
private final MessageQueue messageQueue; // 消息队列
public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
this.processQueue = processQueue;
this.messageQueue = messageQueue;
}
@Override
public void run() {
// 处理队列如果已经被置为删除状态,跳过不进行处理
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
// 获取消息队列的对象锁
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
// 对象消息队列的对象锁加锁
synchronized (objLock) {
// 如果是广播模式、或者当前的消息队列已经加锁成功并且加锁时间未过期
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
// 判断processQueue是否删除
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}
// 如果是集群模式并且processQueue的加锁失败
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
// 稍后进行加锁
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
// 如果是集群模式并且消息队列加锁时间已经过期
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
// 稍后进行加锁
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
long interval = System.currentTimeMillis() - beginTime;
// 如果当前时间距离开始处理的时间超过了最大消费时间
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
// 稍后重新进行处理
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
// 批量消费消息个数
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
// 获取消息内容
List<Messag