libuv 异步网络编程之 TCP 源码分析
发布于: 2020 年 08 月 13 日
在https://xie.infoq.cn/article/8eae86e25998538a12f11b7b3中,我们了解了 libuv 中 tcp 相关函数的使用;接下来,我们掀开 libuv 的外表,看看 libuv 是怎样对 tcp 相关的系统接口进行封装的。
libuv 版本:1.38.1,操作系统:macOS 10.15.6
tldr:tcp 的相关操作还是原来的 bind/listen/accept/connect,不一样的地方是将这些操作后的 io 事件封装到了异步模型里。
uv_tcp_bind
服务端在监听端口之前,需要创建 socket 并调用系统接口 bind 将 socket 绑定到指定地址。
int uv_tcp_bind(uv_tcp_t* handle, const struct sockaddr* addr, unsigned int flags) { unsigned int addrlen; if (addr->sa_family == AF_INET) // IPv4 addrlen = sizeof(struct sockaddr_in); else if (addr->sa_family == AF_INET6) // IPv6 addrlen = sizeof(struct sockaddr_in6); return uv__tcp_bind(handle, addr, addrlen, flags);}int uv__tcp_bind(uv_tcp_t* tcp, const struct sockaddr* addr, unsigned int addrlen, unsigned int flags) { int err; int on; // 因为我们没有创建 socket,maybe_new_socket 里会创建 socket err = maybe_new_socket(tcp, addr->sa_family, 0); if (err) return err; on = 1; // 默认复用端口 if (setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) return UV__ERR(errno); errno = 0; // 将创建的 socket fd 绑定到 addr if (bind(tcp->io_watcher.fd, addr, addrlen) && errno != EADDRINUSE) { if (errno == EAFNOSUPPORT) /* OSX, other BSDs and SunOS fail with EAFNOSUPPORT when binding a * socket created with AF_INET to an AF_INET6 address or vice versa. */ return UV_EINVAL; return UV__ERR(errno); } tcp->delayed_error = UV__ERR(errno); tcp->flags |= UV_HANDLE_BOUND; if (addr->sa_family == AF_INET6) tcp->flags |= UV_HANDLE_IPV6; return 0;}static int maybe_new_socket(uv_tcp_t* handle, int domain, unsigned long flags) { // 忽略已创建 socket 的情况 return new_socket(handle, domain, flags);}static int new_socket(uv_tcp_t* handle, int domain, unsigned long flags) { struct sockaddr_storage saddr; socklen_t slen; int sockfd; int err; // 创建 socket err = uv__socket(domain, SOCK_STREAM, 0); ... sockfd = err; // 设置 tcp 选项;比如关闭拥塞控制 naggle 算法,开启 tcp 的 keepalive // sockfd 将保存到 handle 里 err = uv__stream_open((uv_stream_t*) handle, sockfd, flags); if (err) { ... } ... return 0;}
uv_listen
开始监听端口,接收连接请求。
int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) { int err; switch (stream->type) { case UV_TCP: err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb); break; ... } if (err == 0) // 将 stream 的 handle 状态更新为 active uv__handle_start(stream); return err;}int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) { static int single_accept_cached = -1; unsigned long flags; int single_accept; int err; if (tcp->delayed_error) return tcp->delayed_error; ... // 处理单次连接的情况 flags = 0; // 对于已 uv_tcp_bind 的 socket,maybe_new_socket 不做任何有效操作 err = maybe_new_socket(tcp, AF_INET, flags); // 开始监听端口,该端口就是 uv_tcp_bind 里指定的地址端口 if (listen(tcp->io_watcher.fd, backlog)) return UV__ERR(errno); // 设置新连接的回调函数 tcp->connection_cb = cb; tcp->flags |= UV_HANDLE_BOUND; /* Start listening for connections. */ // cb 将在 uv__server_io 里调用 tcp->io_watcher.cb = uv__server_io; // 将 listen 的 io 事件加入到 loop 里 uv__io_start(tcp->loop, &tcp->io_watcher, POLLIN); return 0;}
uv__server_io
void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { uv_stream_t* stream; int err; stream = container_of(w, uv_stream_t, io_watcher); ... // 新增一个 listen io 事件,因为原来的那个 io 事件已经被消费掉了 uv__io_start(stream->loop, &stream->io_watcher, POLLIN); /* connection_cb can close the server socket while we're * in the loop so check it on each iteration. */ while (uv__stream_fd(stream) != -1) { // 接收新的 fd err = uv__accept(uv__stream_fd(stream)); if (err < 0) { // 错误处理 ... // 调用用户回调函数去处理错误 stream->connection_cb(stream, err); continue; } UV_DEC_BACKLOG(w) // 保存接收到的 fd 并调用用户设置的新连接回调函数 stream->accepted_fd = err; stream->connection_cb(stream, 0); // 判断回调函数是否已接收 accepted fd // 处理单次连接的情况 } }}int uv__accept(int sockfd) { int peerfd; int err; do // 真正的 accept 操作 peerfd = accept(sockfd, NULL, NULL); while (peerfd == -1 && errno == EINTR); // 设置 cloexec 和 nonblock err = uv__cloexec(peerfd, 1); err = uv__nonblock(peerfd, 1); ... return peerfd;}
uv_accept
获取已经 accepted 的 fd。
真正的 accept 操作不在这里,在 uv__server_io 里
int uv_accept(uv_stream_t* server, uv_stream_t* client) { int err; switch (client->type) { case UV_TCP: // 将 accepted_fd 保存到 client err = uv__stream_open(client, server->accepted_fd, UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); if (err) { ... } break; ... default: return UV_EINVAL; } client->flags |= UV_HANDLE_BOUND;done: // 读取下一个 accepted_fd 获取新增一个 listen io 事件 /* Process queued fds */ ... return err;}
uv_tcp_connect
int uv_tcp_connect(uv_connect_t* req, uv_tcp_t* handle, const struct sockaddr* addr, uv_connect_cb cb) { unsigned int addrlen; if (addr->sa_family == AF_INET) addrlen = sizeof(struct sockaddr_in); else if (addr->sa_family == AF_INET6) addrlen = sizeof(struct sockaddr_in6); return uv__tcp_connect(req, handle, addr, addrlen, cb);}int uv__tcp_connect(uv_connect_t* req, uv_tcp_t* handle, const struct sockaddr* addr, unsigned int addrlen, uv_connect_cb cb) { int err; int r; // 对于 client 向 server 发起新连接的情况,maybe_new_socket 会新建一个 socket err = maybe_new_socket(handle, addr->sa_family, UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); do { errno = 0; // 调用系统的 connect 函数 r = connect(uv__stream_fd(handle), addr, addrlen); } while (r == -1 && errno == EINTR); /* We not only check the return value, but also check the errno != 0. * Because in rare cases connect() will return -1 but the errno * is 0 (for example, on Android 4.3, OnePlus phone A0001_12_150227) * and actually the tcp three-way handshake is completed. */ ... // 错误处理 // 创建一个请求 handle,设置连接后的回调函数 uv__req_init(handle->loop, req, UV_CONNECT); req->cb = cb; req->handle = (uv_stream_t*) handle; QUEUE_INIT(&req->queue); handle->connect_req = req; // 向 loop 新增一个 io 任务 uv__io_start(handle->loop, &handle->io_watcher, POLLOUT); ... return 0;}
总结
libuv 将 uv_listen
和 uv_tcp_connect
后发生的 io 事件封装到了 loop 里,每次轮询 loop 里的任务时都会处理这些 io 事件。探究 uv__io
便能理解 libuv 的异步模型。
划线
评论
复制
发布于: 2020 年 08 月 13 日阅读数: 127
版权声明: 本文为 InfoQ 作者【Huayra】的原创文章。
原文链接:【http://xie.infoq.cn/article/a4970f2298e4aec1b98e37bc4】。未经作者许可,禁止转载。
Huayra
关注
Gopher & Pythonista 2017.12.12 加入
还未添加个人简介
评论