写点什么

「源码解析」 消息队列 Kombu 基本架构综述,透过根源从而探究红黑树的本质

用户头像
极客good
关注
发布于: 刚刚
  • 通道 channel:信道是 “真实的” TCP 连接内的虚拟连接,AMQP 的命令都是通过通道发送的。在一条 TCP 连接上可以创建多条信道。有些应用需要与 AMQP 代理建立多个连接。同时开启多个 TCP 连接不合适,因为会消耗掉过多的系统资源并且使得防火墙的配置更加困难。AMQP 0-9-1 提供了通道(channels)来处理多连接,可以把通道理解成共享一个 TCP 连接的多个轻量化连接。在涉及多线程 / 进程的应用中,为每个线程 / 进程开启一个通道(channel)是很常见的,并且这些通道不能被线程 / 进程共享。一个特定通道上的通讯与其他通道上的通讯是完全隔离的,因此每个 AMQP 方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个通道准备的。

  • 队列:存放消息的地方,队列通过路由键绑定到交换机,生产者通过交换机将消息发送到队列中。我们可以说应用注册了一个消费者,或者说订阅了一个队列。一个队列可以注册多个消费者,也可以注册一个独享的消费者(当独享消费者存在时,其他消费者即被排除在外)。

  • Exchange 和 绑定:生产者发布消息时,先将消息发送到 Exchange,通过 Exchange 与队列的绑定规则将消息发送到队列。交换机是用来发送消息的 AMQP 实体。交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型绑定(Bindings)规则所决定的。交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。

  • 常见的 Exchange 有 topic、fanout、direct:direct Exchange:direct 交换机是包含空白字符串的默认交换机,当声明队列时会主动绑定到默认交换机,并且以队列名称为路由键;fanout Exchange:这种交换机会将收到的消息广播到绑定的队列;topic Exchange:topic 交换机可以通过路由键的正则表达式将消息发送到多个队列;


1.2 工作过程


============


工作过程是:


  • 发布者(Publisher)发布消息(Message),经由交换机(Exchange)。消息从来不直接发送给队列,甚至 Producers 都可能不知道队列的存在。 消息是发送给交换机,给交换机发送消息时,需要指定消息的 routing_key 属性;

  • 交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。交换机收到消息后,根据 交换机的类型,


【一线大厂Java面试题解析+核心总结学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


或直接发送给队列 (fanout), 或匹配消息的 routing_key 和 队列与交换机之间的 banding_key。 如果匹配,则递交消息给队列;


  • 最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。Consumers 从队列取得消息;


基本如下图:


+----------------------------------------------+


| AMQP Entity |


| |


| |


| |


+-----------+ | +------------+ binding +---------+ | +------------+


| | | | | | | | | |


| Publisher | +------> | Exchange | +---------> | Queue | +--------> | Consumer |


| | | | | | | | | |


+-----------+ | +------------+ +---------+ | +------------+


| |


| |


+----------------------------------------------+


0x02 Poll 系列模型


=================


Kombu 利用了 Poll 模型,所以我们有必要介绍下。这就是 IO 多路复用。


IO 多路复用是指内核一旦发现进程指定的一个或者多个 IO 条件准备读取,它就通知该进程。IO 多路复用适用比如当客户处理多个描述字时(一般是交互式输入和网络套接口)。


与多进程和多线程技术相比,I/O 多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。


2.1 select


==============


select 通过一个 select()系统调用来监视多个文件描述符的数组(在 linux 中一切事物皆文件,块设备,socket 连接等)。


当 select()返回后,该数组中就绪的文件描述符便会被内核修改标志位(变成 ready),使得进程可以获得这些文件描述符从而进行后续的读写操作(select 会不断监视网络接口的某个目录下有多少文件描述符变成 ready 状态【在网络接口中,过来一个连接就会建立一个'文件'】,变成 ready 状态后,select 就可以操作这个文件描述符了)。


2.2 poll


============


poll 和 select 在本质上没有多大差别,但是 poll 没有最大文件描述符数量的限制。


poll 和 select 同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。


select()和 poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行 IO 操作,那么下次调用 select()和 poll() 的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。


2.3 epoll


=============


epoll 由内核直接支持,可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些。


epoll 同样只告知那些就绪的文件描述符,而且当我们调用 epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表 就绪描述符数量的值,你只需要去 epoll 指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了 这些文件描述符在系统调用时复制的开销。


另一个本质的改进在于 epoll 采用基于事件的就绪通知方式。在 select/poll 中,进程只有在调用一定的方法后,内核才对所有监视的文件描 述符进行扫描,而 epoll 事先通过 epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似 callback 的回调 机制,迅速激活这个文件描述符,当进程调用 epoll_wait()时便得到通知。


2.4 通俗理解


============


2.4.1 阻塞 I/O 模式


=================


阻塞 I/O 模式下,内核对于 I/O 事件的处理是阻塞或者唤醒,一个线程只能处理一个流的 I/O 事件。如果想要同时处理多个流,要么多进程(fork),要么多线程(pthread_create),很不幸这两种方法效率都不高。


2.4.2 非阻塞模式


===============


非阻塞忙轮询的 I/O 方式可以同时处理多个流。我们只要不停地把所有流从头到尾问一遍,又从头开始。这样就可以处理多个流了,但这样的做法显然不好,因为如果所有的流都没有数据,那么只会白白浪费 CPU。


2.4.2.1 代理模式


================


非阻塞模式下可以把 I/O 事件交给其他对象(select 以及 epoll)处理甚至直接忽略。


为了避免 CPU 空转,可以引进一个代理(一开始有一位叫做 select 的代理,后来又有一位叫做 poll 的代理,不过两者的本质是一样的)。这个代理比较厉害,可以同时观察许多流的 I/O 事件,在空闲的时候,会把当前线程阻塞掉,当有一个或多个流有 I/O 事件时,就从阻塞态中醒来,于是我们的程序就会轮询一遍所有的流(于是我们可以把“忙”字去掉了)。代码长这样:


while true {


select(streams[])


for i in streams[] {


if i has data


read until unavailable


}


}


于是,如果没有 I/O 事件产生,我们的程序就会阻塞在 select 处。但是依然有个问题,我们从 select 那里仅仅知道了,有 I/O 事件发生了,但却并不知道是那几个流(可能有一个,多个,甚至全部),我们只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。


2.4.2.2 epoll


=================


epoll 可以理解为 event poll,不同于忙轮询和无差别轮询,epoll 只会把哪个流发生了怎样的 I/O 事件通知我们。此时我们对这些流的操作都是有意义的(复杂度降低到了 O(1))。


epoll 版服务器实现原理类似于 select 版服务器,都是通过某种方式对套接字进行检验其是否能收发数据等。但是 epoll 版的效率要更高,同时没有上限。


在 select、poll 中的检验,是一种被动的轮询检验,而 epoll 中的检验是一种主动的事件通知检测,即:当有套接字符合检验的要求,便会主动通知,从而进行操作。这样的机制自然效率会高一点。


同时在 epoll 中要用到文件描述符,所谓文件描述符实质上是数字。


epoll 的主要用处在于:


epoll_list = epoll.epoll()


如果进程在处理 while 循环中的代码时,一些套接字对应的客户端如果发来了数据,那么操作系统底层会自动地把这些套接字对应的文件描述符写入该列表中,当进程再次执行到 epoll 时,就会得到了这个列表,此时这个列表中的信息就表示着哪些套接字可以进行收发了。因为 epoll 没有去依次地查看,而是直接拿走已经可以收发的 fd,所以效率高!


0x03 Kombu 基本概念


===================


Kombu 的最初的实现叫做 carrot,后来经过重构才成了 Kombu。


3.1 用途


==========


Kombu 主要用途如下:


  • Celery 是 Python 中最流行的异步消息队列框架,支持 RabbitMQ、Redis、ZoopKeeper 等作为 Broker,而对这些消息队列的抽象,都是通过 Kombu 实现的:Celery 一开始先支持的 RabbitMQ,也就是使用 AMQP 协议。由于要支持越来越多的消息代理,但是这些消息代理是不支持 AMQP 协议的,需要一个东西把所有的消息代理的处理方式统一起来,甚至可以理解为把它们「伪装成支持 AMQ 协议」。Kombu 实现了对 AMQP transport 和 non-AMQP transports(Redis、Amazon SQS、ZoopKeeper 等)的兼容。

  • OpenStack 默认 是使用 kombu 连接 rabbitmq 服务器。OpenStack 使用 kombu 作为消息队列使用的 client 库而没有用广泛使用的 pika 库有两个原因:kombu 除了支持纯 AMQP 的实现还支持虚拟 AMQP 的实现作为消息队列系统,如 redis、mongodb、beantalk 等。kombu 可以通过配置设置 AMQP 连接的底层库,比如 librabbitmq 或者 pyamqp。前者是一个 python 嫁接 C 库的实现,后者是一个纯 python 的实现。如果用纯 python 实现的 AMQP 库,就可以应用 eventlet 的框架将设计网络 IO 的部分变为协程,提高整体的网络 IO 性能。如 openstack 内部使用的就是 eventlet 的框架。


3.2 术语


==========


在 Kombu 中,存在多个概念(部分和 AMQP 类似),他们分别是:


  • Message:消息,发送和消费的主体,生产消费的基本单位,其实就是我们所谓的一条条消息;

  • Connection:对 MQ 连接的抽象,一个 Connection 就对应一个 MQ 的连接;Connection 是 AMQP 对 连接的封装;

  • Channel:与 AMQP 中概念类似,可以理解成共享一个 Connection 的多个轻量化连接;Channel 是 AMQP 对 MQ 的操作的封装;

  • Transport:kombu 支持将不同的消息中间件以插件的方式进行灵活配置,使用 transport 这个术语来表示一个具体的消息中间件,可以认为是对 broker 的抽象:对 MQ 的操作必然离不开连接,但是,Kombu 并不直接让 Channel 使用 Connection 来发送/接受请求,而是引入了一个新的抽象 Transport,Transport 负责具体的 MQ 的操作,也就是说 Channel 的操作都会落到 Transport 上执行。引入 transport 这个抽象概念可以使得后续添加对 non-AMQP 的 transport 非常简单;Transport 是真实的 MQ 连接,也是真正连接到 MQ(redis/rabbitmq) 的实例;当前 Kombu 中 build-in 支持有 Redis、Beanstalk、Amazon SQS、CouchDB,、MongoDB,、ZeroMQ,、ZooKeeper、SoftLayer MQ 和 Pyro;

  • Producers: 发送消息的抽象类;

  • Consumers:接受消息的抽象类。consumer 需要声明一个 queue,并将 queue 与指定的 exchange 绑定,然后从 queue 里面接收消息;

  • Exchange:MQ 路由,这个和 RabbitMQ 差不多,支持 5 类型。消息发送者将消息发至 Exchange,Exchange 负责将消息分发至队列。用于路由消息(消息发给 exchange,exchange 发给对应的 queue)。路由就是比较 routing-key(这个 message 提供)和 binding-key(这个 queue 注册到 exchange 的时候提供)。使用时,需要指定 exchange 的名称和类型(direct,topic 和 fanout)。交换机通过匹配消息的 routing_key 和 binding_key 来转发消息,binding_key 是 consumer 声明队列时与交换机的绑定关系。

  • Queue:对应的 queue 抽象,存储着即将被应用消费掉的消息,Exchange 负责将消息分发 Queue,消费者从 Queue 接收消息。

  • Routing keys: 每个消息在发送时都会声明一个 routing_key。routing_key 的含义依赖于 exchange 的类型。一般说来,在 AMQP 标准里定义了四种默认的 exchange 类型,此外,vendor 还可以自定义 exchange 的类型。最常用的三类 exchange 为:Direct exchange: 如果 message 的 routing_key 和某个 consumer 中的 routing_key 相同,就会把消息发送给这个 consumer 监听的 queue 中。Fan-out exchange: 广播模式。exchange 将收到的 message 发送到所有与之绑定的 queue 中。Topic exchange: 该类型 exchange 会将 message 发送到与之 routing_key 类型相匹配的 queue 中。routing_key 由一系列“.”隔开的 word 组成,“*”代表匹配任何 word,“#”代表匹配 0 个或多个 word,类似于正则表达式。


0x04 概念具体说明


===============


4.1 概述


==========


以 redis 为 broker,我们简要说明:


  • 发送消息的对象,称为生产者 Producer。

  • connections 建立 redis 连接,channel 是一次连接会话。

  • Exchange 负责交换消息,消息通过 channel 发送到 Exchange,由于 Exchange 绑定 Queue 和 routing_key。消息会被转发到 redis 中匹配 routing_key 的 Queue 中。

  • 在 Queue 另一侧的消费者 Consumer 一直对 Queue 进行监听,一旦 Queue 中存在数据,则调用 callback 方法处理消息。


4.2 Connection


==================


Connection 是对 MQ 连接的抽象,一个 Connection 就对应一个 MQ 的连接。现在就是对 'redis://localhost:6379' 连接进行抽象。


conn = Connection('redis://localhost:6379')


由之前论述可知,Connection 是到 broker 的连接。从具体代码可以看出,Connection 更接近是一个逻辑概念,具体功能都委托给别人完成。


Connection 主要成员变量是:


  • _connection:kombu.transport.redis.Transport 类型,就是真正用来负责具体的 MQ 的操作,也就是说对 Channel 的操作都会落到 Transport 上执行。

  • _transport:就是上面提到的对 broker 的抽象。

  • cycle:与 broker 交互的调度策略。

  • failover_strategy:在连接失效时,选取其他 hosts 的策略。

  • heartbeat:用来实施心跳。


精简版定义如下:


class Connection:


"""A connection to the broker"""


port = None


_connection = None


_default_channel = None


_transport = None


#: Iterator returning the next broker URL to try in the event


#: of connection failure (initialized by :attr:failover_strategy).


cycle = None


#: Additional transport specific options,


#: passed on to the transport instance.


transport_options = None


#: Strategy used to select new hosts when reconnecting after connection


#: failure. One of "round-robin", "shuffle" or any custom iterator


#: constantly yielding new URLs to try.


failover_strategy = 'round-robin'


#: Heartbeat value, currently only supported by the py-amqp transport.


heartbeat = None


failover_strategies = failover_strategies


4.3 Channel


===============


Channel:与 AMQP 中概念类似,可以理解成共享一个 Connection 的多个轻量化连接。就是真正的连接。


  • Connection 是 AMQP 对 连接的封装;

  • Channel 是 AMQP 对 MQ 的操作的封装;


Channel 可以认为是 redis 操作和连接的封装。每个 Channel 都可以与 redis 建立一个连接,在此连接之上对 redis 进行操作,每个连接都有一个 socket,每个 socket 都有一个 file,从这个 file 可以进行 poll。


4.3.1 定义


============


简化版定义如下:


class Channel(virtual.Channel):


"""Redis Channel."""


QoS = QoS


_client = None


_subclient = None


keyprefix_queue = '{p}_kombu.binding.%s'.format(p=KEY_PREFIX)


keyprefix_fanout = '/{db}.'


sep = '\x06\x16'


_fanout_queues = {}


unacked_key = '{p}unacked'.format(p=KEY_PREFIX)


unacked_index_key = '{p}unacked_index'.format(p=KEY_PREFIX)


unacked_mutex_key = '{p}unacked_mutex'.format(p=KEY_PREFIX)


unacked_mutex_expire = 300 # 5 minutes


unacked_restore_limit = None


visibility_timeout = 3600 # 1 hour


max_connections = 10


queue_order_strategy = 'round_robin'


_async_pool = None


_pool = None


from_transport_options = (


virtual.Channel.from_transport_options +


('sep',


'ack_emulation',


'unacked_key',


......


'max_connections',


'health_check_interval',


'retry_on_timeout',


'priority_steps') # <-- do not add comma here!


)


connection_class = redis.Connection if redis else None


self.handlers = {'BRPOP': self._brpop_read, 'LISTEN': self._receive}


4.3.2 redis 消息回调函数


=====================


关于上面成员变量,这里需要说明的是


handlers = {dict: 2}


{


'BRPOP': <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7fe61aa88cc0>>,


'LISTEN': <bound method Channel._receive of <kombu.transport.redis.Channel object at 0x7fe61aa88cc0>>


}


这是 redis 有消息时的回调函数,即:


  • BPROP 有消息时候,调用 Channel._brpop_read;

  • LISTEN 有消息时候,调用 Channel._receive;


大约如下:


+---------------------------------------------------------------------------------------------------------------------------------------+


| +--------------+ 6 parse_response |


| +--> | Linux Kernel | +---+ |


| | +--------------+ | |


| | | |


| | | event |


| | 1 | |


| | | 2 |


| | | |


+-------+---+ socket + | |


| redis | <------------> port +--> fd +--->+ v |


| | | +------+--------+ |


| | socket | | Hub | |


| | <------------> port +--> fd +--->----------> | | |


| port=6379 | | | | |


| | socket | | readers +-----> Transport.on_readable |


| | <------------> port +--> fd +--->+ | | + |


+-----------+ +---------------+ | |


| |


3 | |


+----------------------------------------------------------------------------------------+ |


| v


| _receive_callback


| 5 +-------------+ +-----------+


+------------+------+ +-------------------------+ 'BRPOP' = Channel._brpop_read +-----> | Channel | +------------------> | Consumer |


| Transport | | MultiChannelPoller | +------> channel . handlers 'LISTEN' = Channel._receive +-------------+ +---+-------+


| | | | | 8 |


| | on_readable(fileno) | | | ^ |


| cycle +---------------------> | _fd_to_chan +----------------> channel . handlers 'BRPOP' = Channel._brpop_read | |


| | 4 | | | 'LISTEN' = Channel._receive | |


| _callbacks[queue]| | | | | on_m | 9


| + | +-------------------------+ +------> channel . handlers 'BRPOP' = Channel._brpop_read | |


+-------------------+ 'LISTEN' = Channel._receive | |


| | v


| 7 _callback |


+-----------------------------------------------------------------------------------------------------------------------------------------+ User Function


手机如图:



4.4 Transport


=================


Transport:真实的 MQ 连接,也是真正连接到 MQ(redis/rabbitmq) 的实例。就是存储和发送消息的实体,用来区分底层消息队列是用 amqp、Redis 还是其它实现的。


我们顺着上文理一下:


  • Connection 是 AMQP 对 连接的封装;

  • Channel 是 AMQP 对 MQ 的操作的封装;

  • 那么两者的关系就是对 MQ 的操作必然离不开连接,但是 Kombu 并不直接让 Channel 使用 Connection 来发送/接受请求,而是引入了一个新的抽象 Transport,Transport 负责具体的 MQ 的操作,也就是说 Channel 的操作都会落到 Transport 上执行;


在 Kombu 体系中,用 transport 对所有的 broker 进行了抽象,为不同的 broker 提供了一致的解决方案。通过 Kombu,开发者可以根据实际需求灵活的选择或更换 broker。


Transport 负责具体操作,但是 很多操作移交给 loop 与 MultiChannelPoller 进行。


其主要成员变量为:


  • 本 transport 的驱动类型,名字;

  • 对应的 Channel;

  • cycle:MultiChannelPoller,具体下文会提到;


其中重点是 MultiChannelPoller。一个 Connection 有一个 Transport, 一个 Transport 有一个 MultiChannelPoller,对 poll 操作都是由 MultiChannelPoller 完成,redis 操作由 channel 完成。


定义如下:


class Transport(virtual.Transport):


"""Redis Transport."""


Channel = Channel


polling_interval = None # disable sleep between unsuccessful polls.


default_port = DEFAULT_PORT


driver_type = 'redis'


driver_name = 'redis'


implements = virtual.Transport.implements.extend(


asynchronous=True,


exchange_type=frozenset(['direct', 'topic', 'fanout'])


)


def init(self, *args, **kwargs):


super().init(*args, **kwargs)

All channels share the same poller.

self.cycle = MultiChannelPoller()


4.5 MultiChannelPoller


==========================


MultiChannelPoller 定义如下,可以理解为 执行 engine,主要作用是:


  • 收集 channel;

  • 建立 socks fd 到 channel 的映射;

  • 建立 channel 到 socks fd 的映射;

  • 使用 poll;


或者从逻辑上这么理解,MultiChannelPoller 就是:


  • 把 Channel 对应的 socket 同 poll 联系起来,一个 socket 在 linux 系统中就是一个 file,就可以进行 poll 操作;

  • 把 poll 对应的 fd 添加到 MultiChannelPoller 这里,这样 MultiChannelPoller 就可以 打通 Channel ---> socket ---> poll ---> fd ---> 读取 redis 这条通路了,就是如果 redis 有数据来了,MultiChannelPoller 就马上通过 poll 得到通知,就去 redis 读取;


具体定义如下:


class MultiChannelPoller:


"""Async I/O poller for Redis transport."""


eventflags = READ | ERR


def init(self):

active channels

self._channels = set()

file descriptor -> channel map.

self._fd_to_chan = {}

channel -> socket map

self._chan_to_sock = {}

poll implementation (epoll/kqueue/select)

self.poller = poll()

one-shot callbacks called after reading from socket.

self.after_read = set()


4.6 Consumer


================


Consumer 是消息接收者。Consumer & 相关组件 的作用主要如下:


  • Exchange:MQ 路由,消息发送者将消息发至 Exchange,Exchange 负责将消息分发至队列。

  • Queue:对应的队列抽象,存储着即将被应用消费掉的消息,Exchange 负责将消息分发 Queue,消费者从 Queue 接收消息;

  • Consumers 是接受消息的抽象类,consumer 需要声明一个 queue,并将 queue 与指定的 exchange 绑定,然后从 queue 里面接收消息。就是说,从用户角度,知道了一个 exchange 就可以从中读取消息,而具体这个消息就是从 queue 中读取的。


在具体 Consumer 的实现中,它把 queue 与 channel 联系起来。queue 里面有一个 channel,用来访问 redis,也有 Exchange,知道访问具体 redis 哪个 key(就是 queue 对应的那个 key)。


Consumer 消费消息是通过 Queue 来消费,然后 Queue 又转嫁给 Channel。


所以服务端的逻辑大致为:


  1. 建立连接;

  2. 创建 Exchange ;

  3. 创建 Queue,并将 Exchange 与 Queue 绑定,Queue 的名称为 routing_key ;

  4. 创建 Consumer 对 Queue 监听;


Consumer 定义如下:


class Consumer:


"""Message consumer.


Arguments:


channel (kombu.Connection, ChannelT): see :attr:channel.


queues (Sequence[kombu.Queue]): see :attr:queues.


no_ack (bool): see :attr:no_ack.


auto_declare (bool): see :attr:auto_declare


callbacks (Sequence[Callable]): see :attr:callbacks.


on_message (Callable): See :attr:on_message


on_decode_error (Callable): see :attr:on_decode_error.


prefetch_count (int): see :attr:prefetch_count.


"""


ContentDisallowed = ContentDisallowed


#: The connection/channel to use for this consumer.


channel = None


#: A single :class:~kombu.Queue, or a list of queues to


#: consume from.


queues = None


#: Flag for automatic message acknowledgment.


no_ack = None


#: By default all entities will be declared at instantiation, if you


#: want to handle this manually you can set this to :const:False.


auto_declare = True


#: List of callbacks called in order when a message is received.


callbacks = None


#: Optional function called whenever a message is received.


on_message = None


#: Callback called when a message can't be decoded.


on_decode_error = None


#: List of accepted content-types.


accept = None


#: Initial prefetch count


prefetch_count = None


#: Mapping of queues we consume from.


_queues = None


_tags = count(1) # global


此时总体逻辑如下图:


+----------------------+ +-------------------+


| Consumer | | Channel |


| | | | +-----------------------------------------------------------+


| | | client +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |


| channel +--------------------> | | +-----------------------------------------------------------+


| | | pool |


| | +---------> | | <------------------------------------------------------------+


| queues | | | | |


| | | +----> | connection +---------------+ |


| | | | | | | | |


+----------------------+ | | +-------------------+ | |


| | | v |


| | | +-------------------+ +---+-----------------+ +--------------------+ |


| | | | Connection | | redis.Transport | | MultiChannelPoller | |


| | | | | | | | | |


| | | | | | | | _channels +--------+


| | | | | | cycle +------------> | _fd_to_chan |

用户头像

极客good

关注

还未添加个人签名 2021.03.18 加入

还未添加个人简介

评论

发布
暂无评论
「源码解析」 消息队列Kombu基本架构综述,透过根源从而探究红黑树的本质