使用pm2增删改常驻进程脚本

业务中使用pm2来管理MQ的消费者(需常驻),使用pm2有个好处就是,如果进程意外退出了,pm2可以将它自动重启。下面介绍常见的使用方法:新增、删除、修改。

一、新增一个常驻进程

1.1 先创建一个json配置文件,比如:


{ /** * docs: http://pm2.keymetrics.io/docs/usage/application-declaration/#attributes-available */ name: "cront/Trade/deliveyNotice", args: "cront/Trade/deliveyNotice", script: "/opt/ci123/www/html/api_shop/webroot/index.php", exec_interpreter: "/opt/ci123/php/bin/php", exec_mode: "fork", max_memory_restart: "100M", out_file: "/tmp/api_shop/Delivery.log", error_file: "/tmp/api_shop/Delivery.log", instances: 1 }

我们使用代码位置(cront/Trade/deliveyNotice)作为name,方便管理。args是在执行script时加在其后面的参数。

1.2 启动脚本


pm2 start /path/to/your/config.json

start后接上一步写的json配置文件。

二、删除常驻进程

2.1 首先找到我们要删除/关闭的进程在pm2里的id是多少。


// e.g. pm2 list|grep cront/Trade/deliveyNotice pm2 list|grep xxx

pm2 list可以查看所有pm2管理的进程,我们根据关键字把需要删除的进程筛选出来,找到id。

2.2 从pm2中删除进程


pm2 delete id

id是上一步找到的进程id(进程在pm2中的id,不是pid)。

三、修改常驻进程,并重启

pm2有restart和reload命令。

一般认为restart的意思是先关闭旧进程,再重启一个新的进程。
而reload的意思是先重启一个新的进程,再关闭旧进程。


pm2 restart /path/to/your/config.json #或者 pm2 reload /path/to/your/config.json

restart和reload都不能真正实现进程的平滑重启。只有将消费进程改造后,才可以实现平滑重启。

四、其他

4.1 pm2也支持将所有config写在一个json文件里;

4.2 使用pm2管理常驻进程的初衷,是想借助pm2的进程重启功能;其他一些工具比如supervisor也有类似功能。

4.3 pm2管理非node进程时,只能使用fork模式,不能使用cluster模式;

4.4 官方对配置文件的说明: Process File

RabbitMQ-匿名队列

匿名队列

创建RabbitMQ队列的时候,如果没有指定队列名,系统会自动创建一个随机字符串作为队列名,比如:

这是RabbitMQ规定的。匿名队列不是没有名字,而是名字是由系统自动、随机生成的。

匿名队列 vs 具名队列

匿名队列的好处是方便,但是队列名在RabbitMQ里有它的独特用处。当多个Consumer同时订阅一个队列上的同一种消息(比如:Direct交换机,Consumer的binding_key相同),RabbitMQ会在这些Consumer之间轮流投递消息。

总结起来,各自优缺点如下:

  1. 需要消息持久化时,队列名必不可少(虽然匿名队列本质上也有名字,但是很难记对不?故障恢复的时候怎么办呢?);
  2. 匿名队列更适合用在临时环境,一般匿名+队列自动删除同时使用,解决临时队列使用后的维护问题;
  3. 具名队列在迁移Consumer时更具优势,可以先在新服务器上运行Consumer,再关闭旧服务器上的Consumer;
  4. 匿名队列不需要取名字,写法上要简洁写。

总体上,如果业务里要使用队列,建议首先考虑具名队列。如果队列太多怎么办呢?建议使用消息总线,免去维护的烦恼。

RabbitMQ入门

学习东西有三个境界:了解、熟练使用、理解原理。大神一般都是第四个境界:根据原理造轮子。对于RabbitMQ,我目前大概在第二个境界。以下介绍均按照自己理解来,个人不保证完全正确,而且也认为肯定有些不准或错误的地方,建议仅作参考,欢迎指正。

一、RabbitMQ是一款开源企业级消息代理软件,实现了AMQP协议(和MQTT协议)。

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议。
MQTT,即Message Queuing Telemetry Transport,是一种轻量级的、灵活的网络协议,常用于物联网。
官网:https://www.rabbitmq.com/
文档:https://www.rabbitmq.com/documentation.html

二、主要概念

  1. Virtual Host:
    你可以参照windows概念:一机多用户。为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP提供了一个虚拟主机(virtual hosts - vhosts)的概念。Virtual Host是隔离的最高级别。

  2. Exchange: 交换机
    交换机用来发送AMQP实体消息。接收生产者的消息,将消息路由到[0, n](n>0,整数)个队列。消息内容可以是文本,也可以是二进制数据。不同类型的交换机有不同的消息路由策略。每个队列创建的时候,必须指定一个Exchange,这个Exchange是下面描述中的某一种: (以下中文名可能和网上的不一样)

    2.1 Direct exchange: 直连交换机。
    根据消息携带的路由键(routing key)将消息投递给对应队列(完全匹配),可以发送给多个队列(消息复制)。

    2.2 Fanout exchange: 扇形交换机。
    将消息发给绑定到该交换机的所有队列,忽略(队列绑定到exchange时绑定的)路由键。可以处理广播消息。

    2.3 Topic exchange: 主题交换机。

    a. 根据消息携带的路由键,将消息发送给相应队列。和Direct不同的是,主题交换机的匹配规则可以是模糊匹配。
    
    b. "#"(井号)表示任意数量(0个or多个)单词。当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
    
    c. "*"(星号)表示一个单词。当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
    

    2.4 Headers exchange: 消息头交换机/首部交换机。

    a. 和http请求一样,每个RabbitMQ的消息也有消息头,消息头内部有很多键值对,这些键值对可以自己定义。
    
    b. 头交换机使用一个或多个消息头(x-match可以为any或all)来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
    
    c. x-match为any时,任意一个消息头符合即可。相应地,all表示必须满足所有定义的消息头。
    
  3. queue: 队列
    队列需要连接到某具体的交换机(在代码中, channel创建时需要指定交换机,queue再通过channel发送/接收消息,channel是建立在tcp连接上的可复用逻辑通道,避免频繁的tcp连接)。

  4. channel: 通道
    4.1 见queue中的描述。
    4.2 channel可以减少tcp频繁创建、断开的开销。
    4.3 binding_key: 绑定
    4.4 在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。
    4.5 routing_key在某些交换机下会被忽略,相应地binding_key也会被忽略(binding_key的作用就是和routing_key做匹配)。
    4.6 可以理解成binding_key就是生产者指定的"routing_key"。

  5. 消息消费模式
    5.1 订阅-发布
    5.2 消费者主动拉取

  6. 部分特性
    6.1 消息持久化。消息持久化需要以下指标同时持久化:

    a. 交换机持久化
    
    b. 队列持久化
    
    c. 消息设置持久化(Delivery Mode => 2)
    
    d. 如果消息、队列设置了过期时间,过期后的消息会被发送到“死信队列”(如果设置了的话),否则丢弃。
    
    e. 注意上面一条,持久化的消息也可能因为设置过期时间而被丢弃。
    

    6.2 过期时间说明:Message的TTL("expiration")是从数据到达Queue中后开始计时的,而不是Message被创建时计时。Queue的TTL("x-expires")定义是Queue被自动删除前可以处于未使用状态的时间。如果你设置了Queue为自动删除,那么"x-expires"这个参数就有效。

    官方解释:
    [1] Auto-delete:queue that has had at least one consumer is deleted when last consumer unsubscribes

    [2] Expiry time can be set for a given queue by setting the x-expires argument to queue.declare, or by setting the expires policy. This controls for how long a queue can be unused before it is automatically deleted. Unused means the queue has no consumers

    一条消息什么时候过期是由两个因素决定的:Message中的"expiration",以及Queue中的"x-message-ttl",他们都是一种消息过期策略,消息具体过期时间以 min("x-expires", "x-message-ttl")为准。

    6.3 消费者ack机制:

    a. 在确认消息已被获取后,才删除消息;
    
    b. 自动确认ack机制: 如果消息不重要,自动ack的消息可以增加消息的消费速度。
    

    6.4 生产者confirm机制:

    a. 确认消息成功投递(到队列)后,执行回调函数;
    
    b. 不论是持久化,还是生产者confirm,对RabbitMQ吞吐量都有一定影响;