导致某些操作无法进行;
自动操作的过程是不可逆转的,因此需要记录操作历史;
基于性能考虑,大多数操作需要调用数据库的存储过程;
操作的数据需要具备一定的安全性,避免被非法用户对数据造成破坏;
与操作相关的功能以组件形式封装,保证组件的可重用性、可扩展性与可测试性;
数据量可能随着最终用户的增多而逐渐增大;
针对如上的业务需求,我们决定从以下几个方面对各种技术方案进行横向的比较与考量。
- 并发:选择的消息队列一定要很好地支持用户访问的并发性;
- 安全:消息队列是否提供了足够的安全机制;
- 性能伸缩:不能让消息队列成为整个系统的单一性能瓶颈;
- 部署:尽可能让消息队列的部署更为容易;
- 灾备:不能因为意外的错误、故障或其他因素导致处理数据的丢失;
- API易用性:处理消息的API必须足够简单、并能够很好地支持测试与扩展;
我们先后考察了MSMQ、Resque、ActiveMQ和RabbitMQ,通过查询相关资料,以及编写Spike代码验证相关质量,我们最终选择了RabbitMQ。
我们选择放弃MSMQ,是因为它严重依赖Windows操作系统;它虽然提供了易用的GUI方便管理人员对其进行安装和部署,但若要编写自动化部署脚本,却非常困难。同时,MSMQ的队列容量不能查过4M字节,这也是我们无法接收的。Resque的问题是目前仅支持Ruby的客户端调用,不能很好地与.NET平台集成。此外,Resque对消息持久化的处理方式是写入到Redis中,因而需要在已有RDBMS的前提下,引入新的Storage。我们比较倾心于ActiveMQ与RabbitMQ,但通过编写测试代码,采用循环发送大数据消息以验证消息中间件的性能与稳定性时,我们发现ActiveMQ的表现并不太让人满意。至少,在我们的询证调研过程中,ActiveMQ会因为频繁发送大数据消息而偶尔出现崩溃的情况。相对而言,RabbitMQ在各个方面都比较适合我们的架构要求。
例如在灾备与稳定性方面,RabbitMQ提供了可持久化的队列,能够在队列服务崩溃的时候,将未处理的消息持久化到磁盘上。为了避免因为发送消息到写入消息之间的延迟导致信息丢失,RabbitMQ引入了Publisher Confirm机制以确保消息被真正地写入到磁盘中。它对Cluster的支持提供了Active/Passive与Active/Active两种模式。例如,在Active/Passive模式下,一旦一个节点失败,Passive节点就会马上被激活,并迅速替代失败的Active节点,承担起消息传递的职责。如图8所示:
图8 Active/Passive Cluster(图片来自RabbitMQ官方网站)
在并发处理方面,RabbitMQ本身是基于erlang编写的消息中间件,作为一门面向并发处理的编程语言,erlang对并发处理的天生优势使得我们对RabbitMQ的并发特性抱有信心。RabbitMQ可以非常容易地部署到Windows、Linux等操作系统下,同时,它也可以很好地部署到服务器集群中。它的队列容量是没有限制的(取决于安装RabbitMQ的磁盘容量),发送与接收信息的性能表现也非常好。RabbitMQ提供了Java、.NET、Erlang以及C语言的客户端API,调用非常简单,并且不会给整个系统引入太多第三方库的依赖。 例如.NET客户端只需要依赖一个程序集。
即使我们选择了RabbitMQ,但仍有必要对系统与具体的消息中间件进行解耦,这就要求我们对消息的生产者与消费者进行抽象,例如定义如下的接口:
public interface IQueueSubscriber
{
void ListenTo<T>(string queueName, Action<T> action);
void ListenTo<T>(string queueName, Predicate<T> messageProcessedSuccessfully);
void ListenTo<T>(string queueName, Predicate<T> messageProcessedSuccessfully, bool requeueFailedMessages);
}
public interface IQueueProvider
{
T Pop<T>(string queueName);
T PopAndAwaitAcknowledgement<T>(string queueName, Predicate<T> messageProcessedSuccessfully);
T PopAndAwaitAcknowledgement<T>(string queueName, Predicate<T> messageProcessedSuccessfully, bool requeueFailedMessages);
void Push(FunctionalArea functionalArea, string routingKey, object payload);
}
在这两个接口的实现类中,我们封装了RabbitMQ的调用类,例如:
public class RabbitMQSubscriber : IQueueSubscriber
{
public void ListenTo<T>(string queueName, Action<T> action)
{
using (IConnection connection = _factory.OpenConnection())
using (IModel channel = connection.CreateModel())
{
var consumer = new QueueingBasicConsumer(channel);
string consumerTag = channel.BasicConsume(queueName, AcknowledgeImmediately, consumer);
var response = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
var serializer = new java scriptSerializer();
string json = Encoding.UTF8.GetString(response.Body);
var message = serializer.Deserialize<T>(json);
action(message);
}
}
}
public class RabbitMQProvider : IQueueProvider
{
public T Pop<T>(string queueName)
{
var returnVal = default(T);
const bool acknowledgeImmediately = true;
using (var connection = _factory.OpenConnection())