设为首页 加入收藏

TOP

RabbitMQ死信队列另类用法之复合死信(二)
2019-09-17 18:24:36 】 浏览:43
Tags:RabbitMQ 死信 队列 另类 用法 复合
面的接口 puhlish 模拟了业务消息,由于我们依次发布了 红包/订单/投票 消息,所以迭代发布 10 次后,正好形成了一个时序错乱的信息队列,按照自动过期时序计算,当第一个红包超时到达时,第四条消息(红包)也会接着超时,可是由于此时订单和投票消息位于红包消息上面,该红包消息在达到超时时间后并不会被投递到 Consumer 消费队列,这是正确的,我们确实也是希望是这个结果

如果有一个办法把超时的消息自动将其提升到队列顶部就好了!

4. 处理复合死信

在 RabbitMQ 提供的 API 接口中,没有什么直接可用的能将死信队列中超时消息提升到顶部的好办法;但是,我们可以利用部分 API 接口的特性来完成这件事情。

4.1 定时消费客户端

下面,我们将使用一个定时消费客户端来完成对死信队列的轮询,充分利用 RabbitMQ 的消费特性来完成超时消息的位置提升。

过程如下图:

如上图所示,我们增加一个 dlx-timer 定时器,定时的发起对死信队列的消费,该消费者仅仅是消费,不确认消息,也就是不做 ack,然后将消息重新置入队列中;这个过程,就是将消息不断提升位置的过程。

4.2 定时消费客户端实现代码
    public class CdlxTimerService : MQServiceBase
    {
        public override string vHost { get { return "test_mq"; } }
        public override string Exchange { get { return "cdlx-Exchange"; } }
        public override List<BindInfo> Binds => new List<BindInfo>();
        private string queue = "cdlx-Master";

        public CdlxTimerService(IConfiguration cfg, ILogger logger) : base(cfg, logger)
        {
        }

        /// <summary>
        ///  检查死信队列
        /// </summary>
        /// <returns></returns>
        public List<CdlxMessage> CheckMessage()
        {
            long total = 0;
            List<CdlxMessage> list = new List<CdlxMessage>();
            var connection = base.CreateConnection();
            using (IModel channel = connection.Connection.CreateModel())
            {
                bool latest = true;
                while (latest)
                {
                    BasicGetResult result = channel.BasicGet(this.queue, false);
                    total++;
                    latest = result != null;
                    if (latest)
                    {
                        var json = Encoding.UTF8.GetString(result.Body);
                        list.Add(JsonConvert.DeserializeObject<CdlxMessage>(json));
                    }
                }
                channel.Close();
                connection.Close();
            }
            return list;
        }
    }

上面的代码首先在定时调用到来的时候,创建了一个 Connection,然后利用此 Connection 创建了了一个 Channel,紧接着,使用该 Channel 调用 BasicGet 方法,获得队列顶部的信息,且设置 autoAck=false,表示仅检查消息,不确认,然后进入一个 while 迭代过程,一直读取到队列底部,获得所有队列中的信息,最后,关闭了通道释放连接。

这样,就完成了一次消息检查的过程,在调用 BasicGet 后,下一条信息将会出现在队列的顶部,同步,队列将自动对该消息进行超时检查,由于我们在调用 BasicGet 的时候,传入 autoAck=false,不确认该消息,在 RabbitMQ 控制台中,将显示为 unacted,所以在释放连接后,所有消息将会被重新置入队列中,这是一个自动的过程,无需我们做额外的工作。

4.3 Consumer(死信消费队列)最终处理业务

配置队列管理随程序启动停止

        private MQServcieManager serviceManager;
        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory factory, IApplicationLifetime lifeTime)
        {
            serviceManager = new MQServcieManager(this.Configuration, factory.CreateLogger<MQServcieManager>());
            lifeTime.ApplicationStarted.Register(() => { serviceManager.Start(); });
            lifeTime.ApplicationStopping.Register(() => { serviceManager.Stop(); });
            ...
        }

实现消费队列

    public class CdlxConsumerService : MQServiceBase
    {
        public override string vHost { get { return "test_mq"; } }
        public override string Exchange { get { return "cdlx-Exchange"; } }
        private string queue = "cdlx-Consumer";
        private string routeKey = "all";
        private List<BindInfo> bs = new List<BindInfo>();
        public override List<BindInfo> Binds { get { return bs; } }

        public CdlxConsumerService(IConfiguration cfg, ILogger logger) : base(cfg, logger)
        {
            this.bs.Add(new BindInfo
            {
                ExchangeType = ExchangeType.Direct,
                Queue = this.queue,
                RouterKey = this.routeKey,
                OnReceived = this.OnReceived
            });
        }

        private void OnReceived(MessageBody body)
        {
            var message = Js
首页 上一页 1 2 3 下一页 尾页 2/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇.net core 分布式配置中心 下一篇使用ASP.NET Core2.2创建WebApp

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目