本文讨论 tornado 的协程实现原理,简单做了一份笔记。
首先看一段最常见的 tornado web 代码:
import tornadoimport tornado.webimport tornado.genfrom tornado.gen import coroutinefrom tornado.httpclient import AsyncHTTPClient
class GenHandler(tornado.web.RequestHandler): @coroutine def get(self): url = 'http://www.baidu.com' http_client = AsyncHTTPClient() response = yield http_client.fetch(url) yield tornado.gen.sleep(5) self.write(response.body)
class MainHanler(tornado.web.RequestHandler): def get(self): self.write('root')
if __name__ == "__main__": application = tornado.web.Application([ (r"/", MainHanler), (r"/gen_async/", GenHandler), ], autoreload=True) application.listen(8888) tornado.ioloop.IOLoop.current().start()
复制代码
其中最后一行代码 tornado.ioloop.IOLoop.current().start() 启动服务。带着几个问题往下看:
IOLoop 类相当于是对多路复用的封装,起到事件循环的作用,调度整个协程执行过程。
查看 IOLoop 的源码,可以看到 IOLoop 继承自 Configurable,PollIOLoop 又继承自 IOLoop。当 IOLoop 启动时,会确定使用哪一种多路复用方式,epoll、kqueue 还是 select?
# IOLoop 类# IOLoop 中的 configurable_default 方法是重写 Configurable 的# 这里会确定使用哪种多路复用方式@classmethoddef configurable_default(cls): if hasattr(select, "epoll"): from tornado.platform.epoll import EPollIOLoop return EPollIOLoop if hasattr(select, "kqueue"): # Python 2.6+ on BSD or Mac from tornado.platform.kqueue import KQueueIOLoop return KQueueIOLoop from tornado.platform.select import SelectIOLoop return SelectIOLoop
复制代码
# PollIOLoop类def initialize(self, impl, time_func=None, **kwargs): super(PollIOLoop, self).initialize(**kwargs) self._impl = impl if hasattr(self._impl, 'fileno'): set_close_exec(self._impl.fileno()) self.time_func = time_func or time.time self._handlers = {} self._events = {} self._callbacks = [] self._callback_lock = threading.Lock() self._timeouts = [] self._cancellations = 0 self._running = False self._stopped = False self._closing = False self._thread_ident = None self._blocking_signal_threshold = None self._timeout_counter = itertools.count()
# Create a pipe that we send bogus data to when we want to wake # the I/O loop when it is idle self._waker = Waker() self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ)
def add_handler(self, fd, handler, events): fd, obj = self.split_fd(fd) self._handlers[fd] = (obj, stack_context.wrap(handler)) self._impl.register(fd, events | self.ERROR)
def update_handler(self, fd, events): fd, obj = self.split_fd(fd) self._impl.modify(fd, events | self.ERROR)
def remove_handler(self, fd): fd, obj = self.split_fd(fd) self._handlers.pop(fd, None) self._events.pop(fd, None) try: self._impl.unregister(fd) except Exception: gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
复制代码
PollIOLoop 中 initalize 方法中调用 add_handler 方法,注册对应事件的处理函数,如 socket 可读时,回调哪个函数去处理。
IOLoop 和协程之间的信使:Future
class Future(object): def __init__(self): self._result = None self._exc_info = None self._callbacks = [] self.running = True def set_result(self, result): ... def set_exc_info(self, exce_info): ... def result(self): ... def exc_info(self): ... def add_done_callback(self, callback): self._callbacks.append(callback)
复制代码
Future 对象起到“占位符”的作用,协程的执行结果会通过 set_result 方式写入其中,并调用通过 add_done_callback 设置的回调。
恢复唤醒协程的 Runner
class Runner(object): def __init__(self, gen, result_future, first_yielded): self.gen = gen self.result_future = result_future self.future = _null_future self.yield_point = None self.pending_callbacks = None self.results = None self.running = False self.finished = False self.had_exception = False self.io_loop = IOLoop.current() self.stack_context_deactivate = None # 上面一堆不需要看的初始化 if self.handle_yield(first_yielded): gen = result_future = first_yielded = None self.run() def handle_yield(self, yielded):
self.future = convert_yielded(yielded)
if self.future is moment: self.io_loop.add_callback(self.run) return False elif not self.future.done(): def inner(f): # Break a reference cycle to speed GC. f = None self.run() self.io_loop.add_future( self.future, inner) return False return True def run(self): if self.running or self.finished: return try: self.running = True while True: future = self.future if not future.done(): return self.future = None try: orig_stack_contexts = stack_context._state.contexts exc_info = None
try: value = future.result() except Exception: self.had_exception = True exc_info = sys.exc_info() future = None yielded = self.gen.send(value)
except (StopIteration, Return) as e: self.finished = True self.future = _null_future if self.pending_callbacks and not self.had_exception: raise LeakedCallbackError( "finished without waiting for callbacks %r" % self.pending_callbacks) future_set_result_unless_cancelled(self.result_future,_value_from_stopiteration(e)) self.result_future = None self._deactivate_stack_context() return except Exception: # 一些结束操作 return if not self.handle_yield(yielded): return yielded = None finally: self.running = False
复制代码
协程每生成一个 Future,都会生成对应的一个 Runner,并将 Future 初始化注入都其中。Runner 的 run 方法中,通过 self.gen.send(Future) 来启动 Future,当 Future 完成时,将其设置成 done,并回调其预设的 callback。
回答第一个问题:协程的状态保存到哪去了:
IOLoop 中通过 add_future 调用实现类 PollIOLoop 中的 add_callback 方法,其中通过 functools 生成偏函数,放入 _callbacks 列表,等待被回调执行。
# IOLoop 的add_futuredef add_future(self, future, callback): """Schedules a callback on the ``IOLoop`` when the given `.Future` is finished.
The callback is invoked with one argument, the `.Future`. """ assert is_future(future) callback = stack_context.wrap(callback) future.add_done_callback( lambda future: self.add_callback(callback, future))
# PollIOLoop 的add_callbackdef add_callback(self, callback, *args, **kwargs): if thread.get_ident() != self._thread_ident: with self._callback_lock: if self._closing: return list_empty = not self._callbacks self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs)) if list_empty: self._waker.wake() else: if self._closing: return self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs))
复制代码
第二个问题:「合适的时机」是什么?
IOLoop 实际上就是对多路复用的封装,当底层 epoll_wait 事件发生时,即会通知 IOLoop 主线程。
这一段是 IOLoop 中等待多路复用的事件,以及处理事件。
try: # 等待事件 event_pairs = self._impl.poll(poll_timeout)except Exception as e: print("wait fail") if errno_from_exception(e) == errno.EINTR: continue else: raiseif self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0)# 处理事件self._events.update(event_pairs)while self._events: fd, events = self._events.popitem() try: fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events) except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd))fd_obj = handler_func = None
复制代码
第三个问题:具体是怎么恢复的。
Runner 通过不断 check Future 的状态,最后调用 callback 来返回结果。
总结
首先 tornado 对多路复用系统调用做了封装,来实现非阻塞 web 服务。
其次 tornado 通过 yield+Future+Runner 实现了生成 Future,Runner 监控结果,回调 callback 来实现协程的执行。
参考:
http://www.nodekey.com/tornado-yi-bu-yuan-ma-jie-xi/
https://blog.csdn.net/wyx819/article/details/45420017
https://yangyaq.github.io/2019/03/06/tornado的事件循环机制/
评论