设为首页 加入收藏

TOP

RabbitMQ死信队列另类用法之复合死信(一)
2019-09-17 18:24:36 】 浏览:42
Tags:RabbitMQ 死信 队列 另类 用法 复合

前言

在业务开发过程中,我们常常需要做一些定时任务,这些任务一般用来做监控或者清理任务,比如在订单的业务场景中,用户在创建订单后一段时间内,没有完成支付,系统将自动取消该订单,并将库存返回到商品中,又比如在微信中,用户发出红包24小时后,需要对红包进行检查,是否已领取完成,如未领取完成,将剩余金额退回到发送者钱包中,同时销毁该红包。

在项目初始阶段,或者是一些小型的项目中,常常采用定时轮询的方法进行检查,但是我们都知道,定时轮询将给数据库带来不小的压力,而且定时间隔无法进行动态调整,特别是一个系统中,同时存在好几个定时器的时候,就显得非常的麻烦,同时给数据库造成巨大的访问压力。

下面,本文将演示如何使用一个 RabbitMQ 的死信队列同时监控多种业务(复合业务),达到模块解耦,释放压力的目的。

注意:名词“复合死信”是为了叙述方便临时创造的,如有不妥,欢迎指正

1. 什么是 RabbitMQ 死信队列

DLX(Dead Letter Exchanges)死信交换,死信队列本身也是一个普通的消息队列,在创建队列的时候,通过设置一些关键参数,可以将一个普通的消息队列设置为死信队列,与其它消息队列不同的是,其入栈的消息根据入栈时指定的过期时间/被拒绝/超出队列长度被移除,依次被转发到指定的消息队列中进行二次处理。这样说法比较拗口,其原理就是死信队列内位于顶部的消息过期时,该消息将被马上发送到另外一个订阅者(消息队列)中。

其原理入下图

由上图可以看到,目前有三种类型的业务需要使用 DLX 进行处理,因为每个业务的超时时间不一致的问题,如果将他们都放入一个 DLX 中进行处理,将会出现一个时序的问题,即消息队列总数处理顶部的消息,如果顶部的消息未过期,而底部的消息过期,这就麻烦了,因为过期的消息无法得到消费,将会造成延迟;所以正常情况下,最好的办法是每个业务都独立一个队列,这样就可以保证,即将过期的消息总是处于队列的顶部,从而被第一时间处理。

但是多个 DLX 又带来了管理上面的问题,随着业务的增加,越来越多的业务需要进入不同的 DLX ,这个时候我们发现,由于人手不足的原因,维护这么多 DLX 实在是太吃力了,如果能将这些消息都接入一个 DLX 中该多好呀,在一个 DLX 中进行消息订阅,然后进行分发或者处理,这就非常有趣了。

下面就按照这个思路,我们进行集中处理,也就是复合死信交换 CDLX(Composite Dead Letter Exchanges)

2. 如何创建死信队列

创建 DLX 队列的方式非常简单,我们使用 RabbitMQ Web 控制面板进行创建 Exhcange(交换机)/Consumer(死信消费队列)/cdlx(复合死信队列)

2.1 创建队列

创建交换机 cdlx-Exchange

死信消费队列 cdlx-Consumer

复合死信队列 cdlx-Master

  • 注意,这里添加死信队列必须同时设置死信转发交换机和路由,后续通过路由绑定实现消费队列

路由绑定

上面的路由绑定共有两个,分别是 Master 和 Consumer 用于消息路由到队列,为下面的业务消息做准备,建好后的队列如下

3.复合业务进入死信队列

当建立好队列以后,我们就可以专心的处理业务了,下面就来模拟3种业务将消息发送到死信队列的过程

3.1 发送死信消息到队列

发送消息使用了 Asp.NetCore轻松学-实现一个轻量级高可复用的RabbitMQ客户端 中的轻量客户端,封装后的发送消息代码如下

public class CdlxMasterService
    {
        private IConfiguration cfg = null;
        private ILogger logger = null;
        private string vhost = "test_mq";
        private string exchange = "cdlx-Exchange";
        private string routekey = "master";
        private static MQConnection connection = null;

        private MQConnection Connection
        {
            get
            {
                if (connection == null || !connection.Connection.IsOpen)
                {
                    connection = new MQConnection(
                        cfg["rabbitmq:username"],
                        cfg["rabbitmq:password"],
                        cfg["rabbitmq:host"],
                        Convert.ToInt32(cfg["rabbitmq:port"]),
                        vhost,
                        logger);
                }
                return connection;
            }
        }

        private static IModel channel = null;
        private IModel Channel
        {
            get
            {
                if (channel == null || channel.IsClosed)
                    channel = Connection.Connection.CreateModel();

                return channel;
            }
        }

        public void SendMessage(object data)
        {
            string message = JsonConvert.SerializeObject(data);
            this.Connection.Publish(this.Channel, exchange, routekey, message);
        }
    }
3.2 将 CdlxMasterService 注入到服务
  public void ConfigureServices(IServiceCollection services)
        {
           services.AddSingleton<CdlxMasterService>();
           ...
        }
3.3 模拟3种业务生产死信消息
    public class HomeController : Controller
    {
        private CdlxMasterService masterService;
        public HomeController(CdlxMasterService masterService)
        {
            this.masterService = masterService;
        }

        [HttpGet("publish")]
        public int Publish()
        {
            Contract contract = new Contract(this.masterService);
            for (int i = 0; i < 10; i++)
            {
                contract.Publish(MessageType.RedPackage, "红包信息,超时时间1024s");
                contract.Publish(MessageType.Order, "订单信息,超时时间2048s");
                contract.Publish(MessageType.Vote, "投票信息,超时时间4096s");
            }
            return 0;
        }
    }

首页 上一页 1 2 3 下一页 尾页 1/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇.net core 分布式配置中心 下一篇使用ASP.NET Core2.2创建WebApp

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目