写点什么

libuv 异步网络编程之 TCP 源码分析

用户头像
Huayra
关注
发布于: 2020 年 08 月 13 日
libuv 异步网络编程之 TCP 源码分析

https://xie.infoq.cn/article/8eae86e25998538a12f11b7b3中,我们了解了 libuv 中 tcp 相关函数的使用;接下来,我们掀开 libuv 的外表,看看 libuv 是怎样对 tcp 相关的系统接口进行封装的。

libuv 版本:1.38.1,操作系统:macOS 10.15.6



tldr:tcp 的相关操作还是原来的 bind/listen/accept/connect,不一样的地方是将这些操作后的 io 事件封装到了异步模型里。

uv_tcp_bind

服务端在监听端口之前,需要创建 socket 并调用系统接口 bind 将 socket 绑定到指定地址。

int uv_tcp_bind(uv_tcp_t* handle,
const struct sockaddr* addr,
unsigned int flags) {
unsigned int addrlen;
if (addr->sa_family == AF_INET) // IPv4
addrlen = sizeof(struct sockaddr_in);
else if (addr->sa_family == AF_INET6) // IPv6
addrlen = sizeof(struct sockaddr_in6);
return uv__tcp_bind(handle, addr, addrlen, flags);
}
int uv__tcp_bind(uv_tcp_t* tcp,
const struct sockaddr* addr,
unsigned int addrlen,
unsigned int flags) {
int err;
int on;
// 因为我们没有创建 socket,maybe_new_socket 里会创建 socket
err = maybe_new_socket(tcp, addr->sa_family, 0);
if (err)
return err;
on = 1;
// 默认复用端口
if (setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
return UV__ERR(errno);
errno = 0;
// 将创建的 socket fd 绑定到 addr
if (bind(tcp->io_watcher.fd, addr, addrlen) && errno != EADDRINUSE) {
if (errno == EAFNOSUPPORT)
/* OSX, other BSDs and SunOS fail with EAFNOSUPPORT when binding a
* socket created with AF_INET to an AF_INET6 address or vice versa. */
return UV_EINVAL;
return UV__ERR(errno);
}
tcp->delayed_error = UV__ERR(errno);
tcp->flags |= UV_HANDLE_BOUND;
if (addr->sa_family == AF_INET6)
tcp->flags |= UV_HANDLE_IPV6;
return 0;
}
static int maybe_new_socket(uv_tcp_t* handle, int domain, unsigned long flags) {
// 忽略已创建 socket 的情况
return new_socket(handle, domain, flags);
}
static int new_socket(uv_tcp_t* handle, int domain, unsigned long flags) {
struct sockaddr_storage saddr;
socklen_t slen;
int sockfd;
int err;
// 创建 socket
err = uv__socket(domain, SOCK_STREAM, 0);
...
sockfd = err;
// 设置 tcp 选项;比如关闭拥塞控制 naggle 算法,开启 tcp 的 keepalive
// sockfd 将保存到 handle 里
err = uv__stream_open((uv_stream_t*) handle, sockfd, flags);
if (err) {
...
}
...
return 0;
}

uv_listen

开始监听端口,接收连接请求。

int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
int err;
switch (stream->type) {
case UV_TCP:
err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
break;
...
}
if (err == 0)
// 将 stream 的 handle 状态更新为 active
uv__handle_start(stream);
return err;
}
int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) {
static int single_accept_cached = -1;
unsigned long flags;
int single_accept;
int err;
if (tcp->delayed_error)
return tcp->delayed_error;
... // 处理单次连接的情况
flags = 0;
// 对于已 uv_tcp_bind 的 socket,maybe_new_socket 不做任何有效操作
err = maybe_new_socket(tcp, AF_INET, flags);
// 开始监听端口,该端口就是 uv_tcp_bind 里指定的地址端口
if (listen(tcp->io_watcher.fd, backlog))
return UV__ERR(errno);
// 设置新连接的回调函数
tcp->connection_cb = cb;
tcp->flags |= UV_HANDLE_BOUND;
/* Start listening for connections. */
// cb 将在 uv__server_io 里调用
tcp->io_watcher.cb = uv__server_io;
// 将 listen 的 io 事件加入到 loop 里
uv__io_start(tcp->loop, &tcp->io_watcher, POLLIN);
return 0;
}

uv__server_io

void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv_stream_t* stream;
int err;
stream = container_of(w, uv_stream_t, io_watcher);
...
// 新增一个 listen io 事件,因为原来的那个 io 事件已经被消费掉了
uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
/* connection_cb can close the server socket while we're
* in the loop so check it on each iteration.
*/
while (uv__stream_fd(stream) != -1) {
// 接收新的 fd
err = uv__accept(uv__stream_fd(stream));
if (err < 0) {
// 错误处理
...
// 调用用户回调函数去处理错误
stream->connection_cb(stream, err);
continue;
}
UV_DEC_BACKLOG(w)
// 保存接收到的 fd 并调用用户设置的新连接回调函数
stream->accepted_fd = err;
stream->connection_cb(stream, 0);
// 判断回调函数是否已接收 accepted fd
// 处理单次连接的情况
}
}
}
int uv__accept(int sockfd) {
int peerfd;
int err;
do
// 真正的 accept 操作
peerfd = accept(sockfd, NULL, NULL);
while (peerfd == -1 && errno == EINTR);
// 设置 cloexec 和 nonblock
err = uv__cloexec(peerfd, 1);
err = uv__nonblock(peerfd, 1);
...
return peerfd;
}



uv_accept

获取已经 accepted 的 fd。

真正的 accept 操作不在这里,在 uv__server_io 里

int uv_accept(uv_stream_t* server, uv_stream_t* client) {
int err;
switch (client->type) {
case UV_TCP:
// 将 accepted_fd 保存到 client
err = uv__stream_open(client,
server->accepted_fd,
UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
if (err) {
...
}
break;
...
default:
return UV_EINVAL;
}
client->flags |= UV_HANDLE_BOUND;
done:
// 读取下一个 accepted_fd 获取新增一个 listen io 事件
/* Process queued fds */
...
return err;
}

uv_tcp_connect

int uv_tcp_connect(uv_connect_t* req,
uv_tcp_t* handle,
const struct sockaddr* addr,
uv_connect_cb cb) {
unsigned int addrlen;
if (addr->sa_family == AF_INET)
addrlen = sizeof(struct sockaddr_in);
else if (addr->sa_family == AF_INET6)
addrlen = sizeof(struct sockaddr_in6);
return uv__tcp_connect(req, handle, addr, addrlen, cb);
}
int uv__tcp_connect(uv_connect_t* req,
uv_tcp_t* handle,
const struct sockaddr* addr,
unsigned int addrlen,
uv_connect_cb cb) {
int err;
int r;
// 对于 client 向 server 发起新连接的情况,maybe_new_socket 会新建一个 socket
err = maybe_new_socket(handle,
addr->sa_family,
UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
do {
errno = 0;
// 调用系统的 connect 函数
r = connect(uv__stream_fd(handle), addr, addrlen);
} while (r == -1 && errno == EINTR);
/* We not only check the return value, but also check the errno != 0.
* Because in rare cases connect() will return -1 but the errno
* is 0 (for example, on Android 4.3, OnePlus phone A0001_12_150227)
* and actually the tcp three-way handshake is completed.
*/
... // 错误处理
// 创建一个请求 handle,设置连接后的回调函数
uv__req_init(handle->loop, req, UV_CONNECT);
req->cb = cb;
req->handle = (uv_stream_t*) handle;
QUEUE_INIT(&req->queue);
handle->connect_req = req;
// 向 loop 新增一个 io 任务
uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
...
return 0;
}

总结

libuv 将 uv_listenuv_tcp_connect 后发生的 io 事件封装到了 loop 里,每次轮询 loop 里的任务时都会处理这些 io 事件。探究 uv__io 便能理解 libuv 的异步模型。

发布于: 2020 年 08 月 13 日阅读数: 127
用户头像

Huayra

关注

Gopher & Pythonista 2017.12.12 加入

还未添加个人简介

评论

发布
暂无评论
libuv 异步网络编程之 TCP 源码分析