功能需求
这个连接服务器把多个客户连接汇聚为一个内部 TCP 连接,起到“数据串并转换”的作用,让 backend 的逻辑服务器专心处理业务,而无需顾及多连接的并发性。以下是系统的框图:

这个连接服务器的作用与数字电路中的数据选择器 (multiplexer) 类似,所以我把它命名为 multiplexer。(其实 IO-Multiplexing 也是取的这个意思,让一个 thread-of-control 能有选择地处理多个 IO 文件描述符。)
(上图取自 wikipedia,是 public domain 版权)
实现
Multiplexer 的功能需求不复杂,无非是在 backend connection 和 client connections 之间倒腾数据。具体来说,主要是处理四种事件:
对每个新 client connection 分配一个新的整数 id,如果 id 用完了,则断开新连接(这样通过控制 id 的数目就能控制最大连接数)。另外,为了避免 id 过快地被复用(有可能造成 backend 串话),multiplexer 采用 queue 来管理 free id,每次从队列的头部取 id,用完之后放回 queue 的尾部。
当 client connection 到达或断开时,向 backend 发出通知。onClientConnection() http://code.google.com/p/muduo/source/browse/tags/0.2.0/examples/multiplexer/multiplexer_simple.cc#54
当从 client connection 收到数据时,把数据连同 connection id 一同发给 backend。onClientMessage() http://code.google.com/p/muduo/source/browse/tags/0.2.0/examples/multiplexer/multiplexer_simple.cc#117
当从 backend connection 收到数据时,辨别数据是发给哪个 client connection,并执行相应的转发操作。onBackendMessage() http://code.google.com/p/muduo/source/browse/tags/0.2.0/examples/multiplexer/multiplexer_simple.cc#194
如果 backend connection 断开连接,则断开所有 client connections(假设 client 会自动重试)。 onBackendConnection() http://code.google.com/p/muduo/source/browse/tags/0.2.0/examples/multiplexer/multiplexer_simple.cc#162
由上可见,multiplexer 的功能与 proxy 颇为类似。multiplexer_simple.cc 是一个线程版的实现,借助 muduo 的 io-multiplexing 特性,可以方便地处理多个并发连接。
在实现的时候有两点值得注意:
TcpConnection 的 id 如何存放?当从 backend 收到数据,如何根据 id 找到对应的 client connection?当从 client connection 收到数据,如何得知其 id ?
第一个问题比较好解决,用 std::map〈int, TcpConnectionPtr〉 clientConns_; 保存从 id 到 client connection 的映射就行。
第二个问题固然可以用类似的办法解决,但是我想借此介绍一下 muduo::net::TcpConnection 的 context 功能。每个 TcpConnection 都有一个 boost::any 成员,可由客户代码自由支配(get/set),代码如下。这个 boost::any 是 TcpConnection 的 context,可以用于保存与 connection 绑定的任意数据(比方说 connection id、connection 的最后数据到达时间、connection 所代表的用户的名字等等)。这样客户代码不必继承 TcpConnection 就能 attach 自己的状态,而且也用不着 TcpConnectionFactory 了(如果允许继承,那么必然要向 TcpServer 注入此 factory)。
class TcpConnection : public boost::enable_shared_from_this,
boost::noncopyable
{
public:
void setContext(const boost::any& context)
{ context_ = context; }
boost::any& getContext()
{ return context_; }
const boost::any& getContext() const
{ return context_; }
// ...
private:
// ...
boost::any context_;
};
typedef boost::shared_ptr TcpConnectionPtr;
对于 Multiplexer,在 onClientConnection() 里调用 conn->setContext(id),把 id 存到 TcpConnection 对象中。onClientMessage() 从 TcpConnection 对象中取得 id,连同数据一起发送给 backend,完整实现如下:
void onClientMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp)
{
if (!conn->getContext().empty())
{
int id = boost::any_cast(conn->getContext());
sendBackendBuffer(id, buf);
}
else
{
buf->retrieveAll();
}
}
TcpConnection 的生命期如何管理?由于 Client Connection 是动态创建并销毁,其生与灭完全由客户决定,如何保证 backend 想向它发送数据的时候,这个 TcpConnection 对象还活着?解决思路是用 reference counting,当然,不用自己写,用 boost::shared_ptr 即可。TcpConnection 是 muduo 中唯一默认采用 shared_ptr 来管理生命期的对象,盖由其动态生命期的本质决定。更多内容请参考陈硕《当析构函数遇到多线程──C++ 中线程安全的对象回调》
multiplexer 是二进制协议,如何测试呢?
自动化测试
Multiplexer 是 muduo 网络编程示例中第一个具有 non-trivial 业务逻辑的网络程序,根据陈硕《分布式程序的自动化回归测试》一文的思想,我为它编写了 test harness。代码见 http://code.google.com/p/muduo/source/browse/trunk/examples/multiplexer/