学习东西有三个境界:了解、熟练使用、理解原理。大神一般都是第四个境界:根据原理造轮子。对于RabbitMQ,我目前大概在第二个境界。以下介绍均按照自己理解来,个人不保证完全正确,而且也认为肯定有些不准或错误的地方,建议仅作参考,欢迎指正。
一、RabbitMQ是一款开源企业级消息代理软件,实现了AMQP协议(和MQTT协议)。
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议。
MQTT,即Message Queuing Telemetry Transport,是一种轻量级的、灵活的网络协议,常用于物联网。
官网:https://www.rabbitmq.com/
文档:https://www.rabbitmq.com/documentation.html
二、主要概念
- Virtual Host:
你可以参照windows概念:一机多用户。为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP提供了一个虚拟主机(virtual hosts - vhosts)的概念。Virtual Host是隔离的最高级别。 -
Exchange: 交换机
交换机用来发送AMQP实体消息。接收生产者的消息,将消息路由到[0, n](n>0,整数)个队列。消息内容可以是文本,也可以是二进制数据。不同类型的交换机有不同的消息路由策略。每个队列创建的时候,必须指定一个Exchange,这个Exchange是下面描述中的某一种: (以下中文名可能和网上的不一样)2.1 Direct exchange: 直连交换机。
根据消息携带的路由键(routing key)将消息投递给对应队列(完全匹配),可以发送给多个队列(消息复制)。2.2 Fanout exchange: 扇形交换机。
将消息发给绑定到该交换机的所有队列,忽略(队列绑定到exchange时绑定的)路由键。可以处理广播消息。2.3 Topic exchange: 主题交换机。
a. 根据消息携带的路由键,将消息发送给相应队列。和Direct不同的是,主题交换机的匹配规则可以是模糊匹配。 b. "#"(井号)表示任意数量(0个or多个)单词。当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。 c. "*"(星号)表示一个单词。当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
2.4 Headers exchange: 消息头交换机/首部交换机。
a. 和http请求一样,每个RabbitMQ的消息也有消息头,消息头内部有很多键值对,这些键值对可以自己定义。 b. 头交换机使用一个或多个消息头(x-match可以为any或all)来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。 c. x-match为any时,任意一个消息头符合即可。相应地,all表示必须满足所有定义的消息头。
- queue: 队列
队列需要连接到某具体的交换机(在代码中, channel创建时需要指定交换机,queue再通过channel发送/接收消息,channel是建立在tcp连接上的可复用逻辑通道,避免频繁的tcp连接)。 -
channel: 通道
4.1 见queue中的描述。
4.2 channel可以减少tcp频繁创建、断开的开销。
4.3 binding_key: 绑定
4.4 在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。
4.5 routing_key在某些交换机下会被忽略,相应地binding_key也会被忽略(binding_key的作用就是和routing_key做匹配)。
4.6 可以理解成binding_key就是生产者指定的"routing_key"。 -
消息消费模式
5.1 订阅-发布
5.2 消费者主动拉取 -
部分特性
6.1 消息持久化。消息持久化需要以下指标同时持久化:a. 交换机持久化 b. 队列持久化 c. 消息设置持久化(Delivery Mode => 2) d. 如果消息、队列设置了过期时间,过期后的消息会被发送到“死信队列”(如果设置了的话),否则丢弃。 e. 注意上面一条,持久化的消息也可能因为设置过期时间而被丢弃。
6.2 过期时间说明:Message的TTL("expiration")是从数据到达Queue中后开始计时的,而不是Message被创建时计时。Queue的TTL("x-expires")定义是Queue被自动删除前可以处于未使用状态的时间。如果你设置了Queue为自动删除,那么"x-expires"这个参数就有效。
官方解释:
[1] Auto-delete:queue that has had at least one consumer is deleted when last consumer unsubscribes[2] Expiry time can be set for a given queue by setting the x-expires argument to queue.declare, or by setting the expires policy. This controls for how long a queue can be unused before it is automatically deleted. Unused means the queue has no consumers
一条消息什么时候过期是由两个因素决定的:Message中的"expiration",以及Queue中的"x-message-ttl",他们都是一种消息过期策略,消息具体过期时间以 min("x-expires", "x-message-ttl")为准。
6.3 消费者ack机制:
a. 在确认消息已被获取后,才删除消息; b. 自动确认ack机制: 如果消息不重要,自动ack的消息可以增加消息的消费速度。
6.4 生产者confirm机制:
a. 确认消息成功投递(到队列)后,执行回调函数; b. 不论是持久化,还是生产者confirm,对RabbitMQ吞吐量都有一定影响;