ol writable);
bool readEnabled(); //返回是否关注了可读return events_ & kReadEvent;
bool writeEnabled();//返回是否关注了可写return events_ & kWriteEvent;
//处理读写事件
void handleRead() { readcb_(); } //在poller的loop_once循环中,会根据struct epoll_event.data.ptr转换为Channel,如果可读则调用对应的handleRead
void handleWrite() { writecb_(); }//在poller的loop_once循环中,会根据struct epoll_event.data.ptr转换为Channel,如果可写则调用对应的handleWrite
}
在TCP数据能收到(回调)后,重要的是如何保存客户端的数据,处理完请求后发送给对应的客户端,因为有多个客户端的存在,因此要使用TcpConn来记录哪些TCP到来了,处理结果要回馈给哪个数据。
//conn.h
// Tcp连接,使用引用计数
struct TcpConn : public std::enable_shared_from_this<TcpConn> {
// Tcp连接的个状态
enum State {
Invalid = 1,
Handshaking,
Connected,
Closed,
Failed,
};
//服务端
static TcpConnPtr createConnection(EventBase *base, int fd, Ip4Addr local, Ip4Addr peer) {
TcpConnPtr con(new C);
con->attach(base, fd, local, peer);
return con;
}
void attach(EventBase *base, int fd, Ip4Addr local, Ip4Addr peer) {
fatalif((destPort_ <= 0 && state_ != State::Invalid) || (destPort_ >= 0 && state_ != State::Handshaking),
"you should use a new TcpConn to attach. state: %d", state_);
base_ = base;
state_ = State::Handshaking;
local_ = local;
peer_ = peer;
delete channel_;
channel_ = new Channel(base, fd, kWriteEvent | kReadEvent);
trace("tcp constructed %s - %s fd: %d", local_.toString().c_str(), peer_.toString().c_str(), fd);
TcpConnPtr con = shared_from_this();
con->channel_->onRead([=] { con->handleRead(con); });
con->channel_->onWrite([=] { con->handleWrite(con); });
}
//发送数据
void sendOutput() { send(output_); }//return ::write(channel_->fd, buf, bytes);if (wd == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) //写对应fd,如果写失败关注可写事件(水平触发模式)
//收到数据
void handleRead(const TcpConnPtr &con) {
while (state_ == State::Connected) {
input_.makeRoom();
int rd = readImp(channel_->fd(), input_.end(), input_.space());
if(rd > 0) input_.addSize(rd);
}
//...
}
//客户端
template <class C = TcpConn>
static TcpConnPtr createConnection(EventBase *base, const std::string &host, unsigned short port, int timeout = 0, const std::string &localip = "") {
TcpConnPtr con(new C);
con->connect(base, host, port, timeout, localip); //执行connect
return con;
}
public:
EventBase *base_; //属于哪一个事件循环
Channel *channel_; //属于哪一个通道
Buffer input_, output_; //输入和输出缓冲区
Ip4Addr local_, peer_; //本地的套接字
State state_; //连接状态
TcpCallBack readcb_, writablecb_, statecb_;//读写,连入/练出状态回调
std::string destHost_, localIp_;
std::unique_ptr<CodecBase> codec_; //对应codec
};
//服务器
struct TcpServer {
TcpServer(EventBases *bases); //属于哪一个事件循环
int bind(const std::string &host, unsigned short port, bool reusePort = false); //socket,bind,listen,创建listen_channel设置读回调为handleAccept()
static TcpServerPtr startServer(EventBases *bases, const std::string &host, unsigned short port, bool reusePort = false); //创建一个TcpServer,并调用bind函数
void onConnState(const TcpCallBack &cb);//有新的连接连入
// 消息处理与Read回调冲突,只能调用一个
void onConnMsg(CodecBase *codec, const MsgCallBack &cb) {
codec_.reset(codec);
msgcb_ = cb;
assert(!readcb_);
}
private:
EventBase *base_;//属于哪一个事件循环
Ip4Addr addr_; //服务端地址
Channel *listen_channel_; //服务端的Channel
TcpCallBack statecb_, readcb_; //读写回调
MsgCa