设为首页 加入收藏

TOP

muduo库如何支持多线程(二)
2013-11-20 14:23:42 来源: 作者: 【 】 浏览:953
Tags:muduo 如何 支持 线程


    /*每个线程初始化时的回调函数cb*/
    void EventLoopThreadPool::start(const ThreadInitCallback& cb)
    {
    assert(!started_);
    baseLoop_->assertInLoopThread();
    started_ = true;
    for (int i = 0; i < numThreads_; ++i)
    {
    EventLoopThread* t = new EventLoopThread(cb);
    threads_.push_back(t);
    loops_.push_back(t->startLoop());    // 启动EventLoopThread线程,在进入事件循环之前,会调用cb
    }
    if (numThreads_ == 0 && cb)
    {
    // 只有一个EventLoop,在这个EventLoop进入事件循环之前,调用cb
    cb(baseLoop_);
    }
    }
    EventLoop* EventLoopThreadPool::getNextLoop()
    {
    baseLoop_->assertInLoopThread();
    EventLoop* loop = baseLoop_;
    // 如果loops_为空,则loop指向baseLoop_
    // 如果不为空,按照round-robin(RR,轮叫)的调度方式选择一个EventLoop
    if (!loops_.empty())
    {
    // round-robin
    loop = loops_[next_];
    ++next_;
    if (implicit_cast<size_t>(next_) >= loops_.size())
    {
    next_ = 0;
    }
    }
    return loop;
    }
    TcpServer头文件
    TcpServer.h
    [cpp]
    // Copyright 2010, Shuo Chen.  All rights reserved.
    // http://code.google.com/p/muduo/
    //
    // Use of this source code is governed by a BSD-style license
    // that can be found in the License file.
    // Author: Shuo Chen (chenshuo at chenshuo dot com)
    //
    // This is a public header file, it must only include public header files.
    #ifndef MUDUO_NET_TCPSERVER_H
    #define MUDUO_NET_TCPSERVER_H
    #include <muduo/base/Types.h>
    #include <muduo/net/TcpConnection.h>
    #include <map>
    #include <boost/noncopyable.hpp>
    #include <boost/scoped_ptr.hpp>
    namespace muduo
    {
    namespace net
    {
    class Acceptor;
    class EventLoop;
    class EventLoopThreadPool;
    ///
    /// TCP server, supports single-threaded and thread-pool models.
    ///
    /// This is an interface class, so don't expose too much details.
    class TcpServer : boost::noncopyable
    {
    public:
    typedef boost::function<void(EventLoop*)> ThreadInitCallback;
    //TcpServer(EventLoop* loop, const InetAddress& listenAddr);
    TcpServer(EventLoop* loop,
    const InetAddress& listenAddr,
    const string& nameArg);
    ~TcpServer();  // force out-line dtor, for scoped_ptr members.
    const string& hostport() const { return hostport_; }
    const string& name() const { return name_; }
    /// Set the number of threads for handling input.
    ///
    /// Always accepts new connection in loop's thread.
    /// Must be called before @c start
    /// @param numThreads
    /// - 0 means all I/O in loop's thread, no thread will created.
    ///   this is the default value.
    /// - 1 means all I/O in another thread.
    /// - N means a thread pool with N threads, new connections
    ///   are assigned on a round-robin basis.
    void setThreadNum(int numThreads);
    void setThreadInitCallback(const ThreadInitCallback& cb)
    { threadInitCallback_ = cb; }
    /// Starts the server if it's not listenning.
    ///
    /// It's harmless to call it multiple times.
    /// Thread safe.
    void start();
    /// Set connection callback.
    /// Not thread safe.
    // 设置连接到来或者连接关闭回调函数
    void setConnectionCallback(const ConnectionCallback& cb)
    { connectionCallback_ = cb; }
    /// Set message callback.
    /// Not thread safe.
    // 设置消息到来回调函数
    void setMessageCallback(const MessageCallback& cb)
    { messageCallback_ = cb; }
    private:
    /// Not thread safe, but in loop
    void newConnection(int sockfd, const InetAddress& peerAddr);
    /// Thread safe.
    void removeConnection(const TcpConnectionPtr& conn);
    /// Not thread safe, but in loop
    void removeConnectionInLoop(const TcpConnectionPtr& conn);
    typedef std::map<string, TcpConnectionPtr> ConnectionMap;
    EventLoop* loop_;  // the acceptor loop
    const string hostport_;       // 服务端口
    const string name_;           // 服务名
    boost::scoped_ptr<Acceptor> acceptor_; // avoid revealing Acceptor
    boost::scoped_ptr<EventLoopThreadPool> threadPool_; //线程池
    ConnectionCallback connectionCallback_;
    MessageCallback messageCallback_;
    ThreadInitCallback threadInitCallback_;   // IO线程池中的线程在进入事件循环前,会回调用此函数
    bool started_;
    // always in loop thread
    int nextConnId_;              // 下一个连接ID
    ConnectionMap connections_;   // 连接列表
    };
    }
    }
    #endif  // MUDUO_NET_TCPSERVER_H
    TcpServer源文件
    TcpServer.cc
    // Copyright 2010, Shuo Chen.  All rights reserved.
    // http://code.google.com/p/muduo/
    //
    // Use of this source code is governed by a BSD-style license
    // that can be found in the License file.
    // Author: Shuo Chen (chenshuo at chenshuo dot com)
    #include <muduo/net/TcpServer.h>
    #include <muduo/base/Logging.h>
    #include <muduo/net/Acceptor.h>
    #include <muduo/net/EventLoop.h>
    #include <muduo/net/EventLoopThreadPool.h>
    #include <muduo/net/SocketsOps.h>
    #include <boost/bind.hpp>
    #include <stdio.h>  // snprintf
    using namespace muduo;
    using namespace muduo::net;
    TcpServer::TcpServer(EventLoop* loop, /*这是main reactor*/
    const InetAddress& listenAddr,
    const string& nameArg)
    : loop_(CHECK_NOTNULL(loop)),
    hostport_(listenAddr.toIpPort()),
    name_(nameArg),
    acceptor_(new Acceptor(loop, listenAddr)),
    threadPool_(new EventLoopThreadPool(loop)),
    /*connectionCallback_(defaultConnectionCallback),
    messageCallback_(defaultMessageCallback),*/
    started_(false),
    nextConnId_(1)
    {
    // Acceptor::handleRead函数中会回调用TcpServer::newConnection
    // _1对应的是socket文件描述符,_2对应的是对等方的地址(InetAddress)
    acceptor_->setNewConnectionCallback(
    boost::bind(&TcpServer::newConnection, this, _1, _2));
    }
    TcpServer::~TcpServer()
    {
    loop_->assertInLoopThread();
    LOG_TRACE 《 "TcpServer::~TcpServer [" 《 name_ 《 "] destructing";
    for (ConnectionMap::iterator it(connections_.begin());
    it != connections_.end(); ++it)
    {
    TcpConnectionPtr conn = it->second;
    it->second.reset();      // 释放当前所控制的对象,引用计数减一
    conn->getLoop()->runInLoop(
    boost::bind(&TcpConnection::connectDestroyed, conn));
    conn.reset();           // 释放当前所控制的对象,引用计数减一
    }
    }
    void TcpServer::setThreadNum(int numThreads)
    {
    /*numThreads不包含main reactor thread*/
    assert(0 <= numThreads);
    threadPool_->setThreadNum(numThreads);
    }
    // 该函数多次调用是无害的
    // 该函数可以跨线程调用
    void TcpServer::start()
    {
    if (!started_)
    {
    started_ = true;
    /*启动线程池*/
    threadPool_->start(threadInitCallback_);
    }
    if (!acceptor_->listenning())
    {
    // get_pointer返回原生指针
    loop_->runInLoop(
    boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
    }
    }
    void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
    {
    loop_->assertInLoopThread();
    // 按照轮叫的方式选择一个EventLoop
    EventLoop* ioLoop = threadPool_->getNextLoop();
    char buf[32];
    snprintf(buf, sizeof buf, ":%s#%d", hostport_.c_str(), nextConnId_);
    ++nextConnId_;
    string connName = name_ + buf;
    LOG_INFO 《 "TcpServer::newConnection [" 《 name_
    《 "] - new connection [" 《 connName
    《 "] from " 《 peerAddr.toIpPort();
    InetAddress localAddr(sockets::getLocalAddr(sockfd));
    // FIXME poll with zero timeout to double confirm the new connection
    // FIXME use make_shared if necessary
    /*TcpConnectionPtr conn(new TcpConnection(loop_,
    connName,
    sockfd,
    localAddr,
    peerAddr));*/
    TcpConnectionPtr conn(new TcpConnection(ioLoop,
    connName,
    sockfd,
    localAddr,
    peerAddr));
    LOG_TRACE 《 " usecount=" 《 conn.use_count();
    connections_[connName] = conn;
    LOG_TRACE 《 " usecount=" 《 conn.use_count();
    conn->setConnectionCallback(connectionCallback_);
    conn->setMessageCallback(messageCallback_);
    conn->setCloseCallback(
    boost::bind(&TcpServer::removeConnection, this, _1));
    // conn->connectEstablished(); 这个表示直接在当前线程中调用
    ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
    LOG_TRACE 《 " usecount=" 《 conn.use_count();
    }
    void TcpServer::removeConnection(const TcpConnectionPtr& conn)
    {
    /*
    loop_->assertInLoopThread();
    LOG_INFO 《 "TcpServer::removeConnectionInLoop [" 《 name_
    《 "] - connection " 《 conn->name();
    LOG_TRACE 《 " usecount=" 《 conn.use_count();
    size_t n = connections_.erase(conn->name());
    LOG_TRACE 《 " usecount=" 《 conn.use_count();
    (void)n;
    assert(n == 1);
    loop_->queueInLoop(
    boost::bind(&TcpConnection::connectDestroyed, conn));
    LOG_TRACE 《 " usecount=" 《 conn.use_count();
    */
    loop_->runInLoop(boost::bind(&TcpServer::removeConnectionInLoop, this, conn));
    }
    void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
    {
    loop_->assertInLoopThread();
    LOG_INFO 《 "TcpServer::removeConnectionInLoop [" 《 name_
    《 "] - connection " 《 conn->name();
    LOG_TRACE 《 " usecount=" 《 conn.use_count();
    size_t n = connections_.erase(conn->name());
    LOG_TRACE 《 " usecount=" 《 conn.use_count();
    (void)n;
    assert(n == 1);
    EventLoop* ioLoop = conn->getLoop();
    ioLoop->queueInLoop(
    boost::bind(&TcpConnection::connectDestroyed, conn));
    //loop_->queueInLoop(
    //    boost::bind(&TcpConnection::connectDestroyed, conn));
    LOG_TRACE 《 " usecount=" 《 conn.use_count();
    }

        

首页 上一页 1 2 3 4 5 6 7 下一页 尾页 2/13/13
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇二叉树的创建与递归遍历 下一篇快速幂和欧拉函数的优化求解

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

·常用meta整理 | 菜鸟 (2025-12-25 01:21:52)
·SQL HAVING 子句:深 (2025-12-25 01:21:47)
·SQL CREATE INDEX 语 (2025-12-25 01:21:45)
·Shell 传递参数 (2025-12-25 00:50:45)
·Linux echo 命令 - (2025-12-25 00:50:43)