设为首页 加入收藏

TOP

消息队列一:为什么需要消息队列(MQ)?(四)
2019-09-17 19:06:13 】 浏览:198
Tags:消息 队列 为什么 需要
导致某些操作无法进行;
  • 自动操作的过程是不可逆转的,因此需要记录操作历史;
  • 基于性能考虑,大多数操作需要调用数据库的存储过程;
  • 操作的数据需要具备一定的安全性,避免被非法用户对数据造成破坏;
  • 与操作相关的功能以组件形式封装,保证组件的可重用性、可扩展性与可测试性;
  • 数据量可能随着最终用户的增多而逐渐增大;
  • 针对如上的业务需求,我们决定从以下几个方面对各种技术方案进行横向的比较与考量。

    • 并发:选择的消息队列一定要很好地支持用户访问的并发性;
    • 安全:消息队列是否提供了足够的安全机制;
    • 性能伸缩:不能让消息队列成为整个系统的单一性能瓶颈;
    • 部署:尽可能让消息队列的部署更为容易;
    • 灾备:不能因为意外的错误、故障或其他因素导致处理数据的丢失;
    • API易用性:处理消息的API必须足够简单、并能够很好地支持测试与扩展;

    我们先后考察了MSMQ、Resque、ActiveMQ和RabbitMQ,通过查询相关资料,以及编写Spike代码验证相关质量,我们最终选择了RabbitMQ。

    我们选择放弃MSMQ,是因为它严重依赖Windows操作系统;它虽然提供了易用的GUI方便管理人员对其进行安装和部署,但若要编写自动化部署脚本,却非常困难。同时,MSMQ的队列容量不能查过4M字节,这也是我们无法接收的。Resque的问题是目前仅支持Ruby的客户端调用,不能很好地与.NET平台集成。此外,Resque对消息持久化的处理方式是写入到Redis中,因而需要在已有RDBMS的前提下,引入新的Storage。我们比较倾心于ActiveMQ与RabbitMQ,但通过编写测试代码,采用循环发送大数据消息以验证消息中间件的性能与稳定性时,我们发现ActiveMQ的表现并不太让人满意。至少,在我们的询证调研过程中,ActiveMQ会因为频繁发送大数据消息而偶尔出现崩溃的情况。相对而言,RabbitMQ在各个方面都比较适合我们的架构要求。

    例如在灾备与稳定性方面,RabbitMQ提供了可持久化的队列,能够在队列服务崩溃的时候,将未处理的消息持久化到磁盘上。为了避免因为发送消息到写入消息之间的延迟导致信息丢失,RabbitMQ引入了Publisher Confirm机制以确保消息被真正地写入到磁盘中。它对Cluster的支持提供了Active/Passive与Active/Active两种模式。例如,在Active/Passive模式下,一旦一个节点失败,Passive节点就会马上被激活,并迅速替代失败的Active节点,承担起消息传递的职责。如图8所示:

    图8 Active/Passive Cluster(图片来自RabbitMQ官方网站)

    在并发处理方面,RabbitMQ本身是基于erlang编写的消息中间件,作为一门面向并发处理的编程语言,erlang对并发处理的天生优势使得我们对RabbitMQ的并发特性抱有信心。RabbitMQ可以非常容易地部署到Windows、Linux等操作系统下,同时,它也可以很好地部署到服务器集群中。它的队列容量是没有限制的(取决于安装RabbitMQ的磁盘容量),发送与接收信息的性能表现也非常好。RabbitMQ提供了Java、.NET、Erlang以及C语言的客户端API,调用非常简单,并且不会给整个系统引入太多第三方库的依赖。 例如.NET客户端只需要依赖一个程序集。

    即使我们选择了RabbitMQ,但仍有必要对系统与具体的消息中间件进行解耦,这就要求我们对消息的生产者与消费者进行抽象,例如定义如下的接口:

        public interface IQueueSubscriber
        {
            void ListenTo<T>(string queueName, Action<T> action);
            void ListenTo<T>(string queueName, Predicate<T> messageProcessedSuccessfully);
            void ListenTo<T>(string queueName, Predicate<T> messageProcessedSuccessfully, bool requeueFailedMessages);
        }
    
        public interface IQueueProvider
        {
            T Pop<T>(string queueName);
            T PopAndAwaitAcknowledgement<T>(string queueName, Predicate<T> messageProcessedSuccessfully);
            T PopAndAwaitAcknowledgement<T>(string queueName, Predicate<T> messageProcessedSuccessfully, bool requeueFailedMessages);
            void Push(FunctionalArea functionalArea, string routingKey, object payload);
        }
    
    

    在这两个接口的实现类中,我们封装了RabbitMQ的调用类,例如:

        public class RabbitMQSubscriber : IQueueSubscriber
        {
            public void ListenTo<T>(string queueName, Action<T> action)
            {
                using (IConnection connection = _factory.OpenConnection())
                using (IModel channel = connection.CreateModel())
                {
                    var consumer = new QueueingBasicConsumer(channel);
                    string consumerTag = channel.BasicConsume(queueName, AcknowledgeImmediately, consumer);
    
                    var response = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
                    var serializer = new java scriptSerializer();
                    string json = Encoding.UTF8.GetString(response.Body);
                    var message = serializer.Deserialize<T>(json);
    
                    action(message);
                }
            }       
        }
        public class RabbitMQProvider : IQueueProvider
        {
    
            public T Pop<T>(string queueName)
            {
                var returnVal = default(T);
                const bool acknowledgeImmediately = true;
    
                using (var connection = _factory.OpenConnection())
    
    首页 上一页 1 2 3 4 5 6 下一页 尾页 4/6/6
    】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
    上一篇Serverless无服务应用架构纵横谈 下一篇Python 环境搭建

    最新文章

    热门文章

    Hot 文章

    Python

    C 语言

    C++基础

    大数据基础

    linux编程基础

    C/C++面试题目