面的接口 puhlish 模拟了业务消息,由于我们依次发布了 红包/订单/投票 消息,所以迭代发布 10 次后,正好形成了一个时序错乱的信息队列,按照自动过期时序计算,当第一个红包超时到达时,第四条消息(红包)也会接着超时,可是由于此时订单和投票消息位于红包消息上面,该红包消息在达到超时时间后并不会被投递到 Consumer 消费队列,这是正确的,我们确实也是希望是这个结果
如果有一个办法把超时的消息自动将其提升到队列顶部就好了!
4. 处理复合死信
在 RabbitMQ 提供的 API 接口中,没有什么直接可用的能将死信队列中超时消息提升到顶部的好办法;但是,我们可以利用部分 API 接口的特性来完成这件事情。
4.1 定时消费客户端
下面,我们将使用一个定时消费客户端来完成对死信队列的轮询,充分利用 RabbitMQ 的消费特性来完成超时消息的位置提升。
过程如下图:
如上图所示,我们增加一个 dlx-timer 定时器,定时的发起对死信队列的消费,该消费者仅仅是消费,不确认消息,也就是不做 ack,然后将消息重新置入队列中;这个过程,就是将消息不断提升位置的过程。
4.2 定时消费客户端实现代码
public class CdlxTimerService : MQServiceBase
{
public override string vHost { get { return "test_mq"; } }
public override string Exchange { get { return "cdlx-Exchange"; } }
public override List<BindInfo> Binds => new List<BindInfo>();
private string queue = "cdlx-Master";
public CdlxTimerService(IConfiguration cfg, ILogger logger) : base(cfg, logger)
{
}
/// <summary>
/// 检查死信队列
/// </summary>
/// <returns></returns>
public List<CdlxMessage> CheckMessage()
{
long total = 0;
List<CdlxMessage> list = new List<CdlxMessage>();
var connection = base.CreateConnection();
using (IModel channel = connection.Connection.CreateModel())
{
bool latest = true;
while (latest)
{
BasicGetResult result = channel.BasicGet(this.queue, false);
total++;
latest = result != null;
if (latest)
{
var json = Encoding.UTF8.GetString(result.Body);
list.Add(JsonConvert.DeserializeObject<CdlxMessage>(json));
}
}
channel.Close();
connection.Close();
}
return list;
}
}
上面的代码首先在定时调用到来的时候,创建了一个 Connection,然后利用此 Connection 创建了了一个 Channel,紧接着,使用该 Channel 调用 BasicGet 方法,获得队列顶部的信息,且设置 autoAck=false,表示仅检查消息,不确认,然后进入一个 while 迭代过程,一直读取到队列底部,获得所有队列中的信息,最后,关闭了通道释放连接。
这样,就完成了一次消息检查的过程,在调用 BasicGet 后,下一条信息将会出现在队列的顶部,同步,队列将自动对该消息进行超时检查,由于我们在调用 BasicGet 的时候,传入 autoAck=false,不确认该消息,在 RabbitMQ 控制台中,将显示为 unacted,所以在释放连接后,所有消息将会被重新置入队列中,这是一个自动的过程,无需我们做额外的工作。
4.3 Consumer(死信消费队列)最终处理业务
配置队列管理随程序启动停止
private MQServcieManager serviceManager;
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory factory, IApplicationLifetime lifeTime)
{
serviceManager = new MQServcieManager(this.Configuration, factory.CreateLogger<MQServcieManager>());
lifeTime.ApplicationStarted.Register(() => { serviceManager.Start(); });
lifeTime.ApplicationStopping.Register(() => { serviceManager.Stop(); });
...
}
实现消费队列
public class CdlxConsumerService : MQServiceBase
{
public override string vHost { get { return "test_mq"; } }
public override string Exchange { get { return "cdlx-Exchange"; } }
private string queue = "cdlx-Consumer";
private string routeKey = "all";
private List<BindInfo> bs = new List<BindInfo>();
public override List<BindInfo> Binds { get { return bs; } }
public CdlxConsumerService(IConfiguration cfg, ILogger logger) : base(cfg, logger)
{
this.bs.Add(new BindInfo
{
ExchangeType = ExchangeType.Direct,
Queue = this.queue,
RouterKey = this.routeKey,
OnReceived = this.OnReceived
});
}
private void OnReceived(MessageBody body)
{
var message = Js