写点什么

tornado 的协程调度原理

作者:菜皮日记
  • 2023-09-08
    北京
  • 本文字数:5865 字

    阅读完需:约 19 分钟

本文讨论 tornado 的协程实现原理,简单做了一份笔记。

首先看一段最常见的 tornado web 代码:

import tornadoimport tornado.webimport tornado.genfrom tornado.gen import coroutinefrom tornado.httpclient import AsyncHTTPClient
class GenHandler(tornado.web.RequestHandler):    @coroutine    def get(self):        url = 'http://www.baidu.com'        http_client = AsyncHTTPClient()        response = yield http_client.fetch(url)        yield tornado.gen.sleep(5)        self.write(response.body)
class MainHanler(tornado.web.RequestHandler):    def get(self):        self.write('root')
if __name__ == "__main__":    application = tornado.web.Application([        (r"/", MainHanler),        (r"/gen_async/", GenHandler),    ], autoreload=True)    application.listen(8888)    tornado.ioloop.IOLoop.current().start()
复制代码

其中最后一行代码 tornado.ioloop.IOLoop.current().start() 启动服务。带着几个问题往下看:

  • 知道 yield 可以暂存执行状态,等「合适的时机」重新恢复执行,那么保存的状态到哪去了?

  • 上一个问题中「合适的时机」是到底是什么时候?

  • 继续接上一个问题,具体是怎么恢复执行的?

IOLoop 类相当于是对多路复用的封装,起到事件循环的作用,调度整个协程执行过程。

查看 IOLoop 的源码,可以看到 IOLoop 继承自 Configurable,PollIOLoop 又继承自 IOLoop。当 IOLoop 启动时,会确定使用哪一种多路复用方式,epoll、kqueue 还是 select?

# IOLoop 类# IOLoop 中的 configurable_default 方法是重写 Configurable 的# 这里会确定使用哪种多路复用方式@classmethoddef configurable_default(cls):    if hasattr(select, "epoll"):        from tornado.platform.epoll import EPollIOLoop        return EPollIOLoop    if hasattr(select, "kqueue"):        # Python 2.6+ on BSD or Mac        from tornado.platform.kqueue import KQueueIOLoop      return KQueueIOLoop    from tornado.platform.select import SelectIOLoop  return SelectIOLoop
复制代码


# PollIOLoop类def initialize(self, impl, time_func=None, **kwargs):    super(PollIOLoop, self).initialize(**kwargs)    self._impl = impl    if hasattr(self._impl, 'fileno'):        set_close_exec(self._impl.fileno())    self.time_func = time_func or time.time    self._handlers = {}    self._events = {}    self._callbacks = []    self._callback_lock = threading.Lock()    self._timeouts = []    self._cancellations = 0    self._running = False    self._stopped = False    self._closing = False    self._thread_ident = None    self._blocking_signal_threshold = None    self._timeout_counter = itertools.count()
    # Create a pipe that we send bogus data to when we want to wake    # the I/O loop when it is idle    self._waker = Waker()    self.add_handler(self._waker.fileno(),                     lambda fd, events: self._waker.consume(),                     self.READ)
def add_handler(self, fd, handler, events):    fd, obj = self.split_fd(fd)    self._handlers[fd] = (obj, stack_context.wrap(handler))    self._impl.register(fd, events | self.ERROR)
def update_handler(self, fd, events):    fd, obj = self.split_fd(fd)    self._impl.modify(fd, events | self.ERROR)
def remove_handler(self, fd):    fd, obj = self.split_fd(fd)    self._handlers.pop(fd, None)    self._events.pop(fd, None)    try:        self._impl.unregister(fd)    except Exception:        gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
复制代码

PollIOLoop 中 initalize 方法中调用 add_handler 方法,注册对应事件的处理函数,如 socket 可读时,回调哪个函数去处理。

IOLoop 和协程之间的信使:Future

class Future(object):    def __init__(self):        self._result = None        self._exc_info = None        self._callbacks = []        self.running = True            def set_result(self, result):        ...            def set_exc_info(self, exce_info):        ...            def result(self):        ...        def exc_info(self):        ...            def add_done_callback(self, callback):        self._callbacks.append(callback)
复制代码

Future 对象起到“占位符”的作用,协程的执行结果会通过 set_result 方式写入其中,并调用通过 add_done_callback 设置的回调。

恢复唤醒协程的 Runner

class Runner(object):    def __init__(self, gen, result_future, first_yielded):        self.gen = gen        self.result_future = result_future        self.future = _null_future        self.yield_point = None        self.pending_callbacks = None        self.results = None        self.running = False        self.finished = False        self.had_exception = False        self.io_loop = IOLoop.current()        self.stack_context_deactivate = None        # 上面一堆不需要看的初始化        if self.handle_yield(first_yielded):            gen = result_future = first_yielded = None            self.run()          def handle_yield(self, yielded):
        self.future = convert_yielded(yielded)
        if self.future is moment:            self.io_loop.add_callback(self.run)            return False        elif not self.future.done():            def inner(f):                # Break a reference cycle to speed GC.                f = None                self.run()            self.io_loop.add_future(                self.future, inner)            return False        return True        def run(self):        if self.running or self.finished:            return        try:            self.running = True            while True:                future = self.future                if not future.done():                    return                self.future = None                try:                    orig_stack_contexts = stack_context._state.contexts                    exc_info = None
                    try:                        value = future.result()                    except Exception:                        self.had_exception = True                        exc_info = sys.exc_info()                    future = None                      yielded = self.gen.send(value)
                except (StopIteration, Return) as e:                    self.finished = True                    self.future = _null_future                    if self.pending_callbacks and not self.had_exception:                        raise LeakedCallbackError(                            "finished without waiting for callbacks %r" %                            self.pending_callbacks)                    future_set_result_unless_cancelled(self.result_future,_value_from_stopiteration(e))                    self.result_future = None                    self._deactivate_stack_context()                    return                except Exception:                    # 一些结束操作                    return                if not self.handle_yield(yielded):                    return                yielded = None        finally:            self.running = False
复制代码

协程每生成一个 Future,都会生成对应的一个 Runner,并将 Future 初始化注入都其中。Runner 的 run 方法中,通过 self.gen.send(Future) 来启动 Future,当 Future 完成时,将其设置成 done,并回调其预设的 callback。

回答第一个问题:协程的状态保存到哪去了:

IOLoop 中通过 add_future 调用实现类 PollIOLoop 中的 add_callback 方法,其中通过 functools 生成偏函数,放入 _callbacks 列表,等待被回调执行。

# IOLoop 的add_futuredef add_future(self, future, callback):    """Schedules a callback on the ``IOLoop`` when the given    `.Future` is finished.
    The callback is invoked with one argument, the    `.Future`.    """    assert is_future(future)    callback = stack_context.wrap(callback)    future.add_done_callback(        lambda future: self.add_callback(callback, future))
# PollIOLoop 的add_callbackdef add_callback(self, callback, *args, **kwargs):  if thread.get_ident() != self._thread_ident:      with self._callback_lock:          if self._closing:              return          list_empty = not self._callbacks          self._callbacks.append(functools.partial(              stack_context.wrap(callback), *args, **kwargs))          if list_empty:              self._waker.wake()  else:      if self._closing:          return      self._callbacks.append(functools.partial(          stack_context.wrap(callback), *args, **kwargs))
复制代码

第二个问题:「合适的时机」是什么?

IOLoop 实际上就是对多路复用的封装,当底层 epoll_wait 事件发生时,即会通知 IOLoop 主线程。

这一段是 IOLoop 中等待多路复用的事件,以及处理事件。

try:    # 等待事件   event_pairs = self._impl.poll(poll_timeout)except Exception as e:   print("wait fail")    if errno_from_exception(e) == errno.EINTR:       continue   else:       raiseif self._blocking_signal_threshold is not None:                    signal.setitimer(signal.ITIMER_REAL,                                     self._blocking_signal_threshold, 0)# 处理事件self._events.update(event_pairs)while self._events:    fd, events = self._events.popitem()    try:        fd_obj, handler_func = self._handlers[fd]        handler_func(fd_obj, events)    except (OSError, IOError) as e:        if errno_from_exception(e) == errno.EPIPE:            pass        else:            self.handle_callback_exception(self._handlers.get(fd))    except Exception:        self.handle_callback_exception(self._handlers.get(fd))fd_obj = handler_func = None
复制代码

第三个问题:具体是怎么恢复的。

Runner 通过不断 check Future 的状态,最后调用 callback 来返回结果。

总结

首先 tornado 对多路复用系统调用做了封装,来实现非阻塞 web 服务。

其次 tornado 通过 yield+Future+Runner 实现了生成 Future,Runner 监控结果,回调 callback 来实现协程的执行。

参考:

http://www.nodekey.com/tornado-yi-bu-yuan-ma-jie-xi/

https://blog.csdn.net/wyx819/article/details/45420017

https://yangyaq.github.io/2019/03/06/tornado的事件循环机制/

用户头像

菜皮日记

关注

全干程序员 2018-08-08 加入

还未添加个人简介

评论

发布
暂无评论
tornado 的协程调度原理_tornado_菜皮日记_InfoQ写作社区