写点什么

使用 php-amqplib 实现 RabbitMq

作者:Owen Zhang
  • 2022 年 1 月 18 日
  • 本文字数:9825 字

    阅读完需:约 32 分钟

使用php-amqplib实现RabbitMq

本文环境 Ubuntu 20.04,Nginx1.8,PHP7.3,RabbitMq3.9

不懂的可以评论或联系我邮箱:owen@owenzhang.com

著作权归 OwenZhang 所有。商业转载请联系 OwenZhang 获得授权,非商业转载请注明出处。

项目代码

​https://gitee.com/owenzhang24/tp5​

队列笔记

1: 列出队列(Listing queues)


如果你想查看 Rabbitmq 队列,并且想知道有多少消息存在其中,你(作为特权用户)可以使用 rabbitmqctl 工具:


rabbitmqctl list_queues。


在 Windows 中,省略 sudo:


rabbitmqctl.bat list_queues


2: 工作队列


我们发现即使使用 CTRL+C 杀掉了一个工作者(worker)进程,消息也不会丢失。当工作者(worker)挂掉这后,所有没有响应的消息都会重新发送。


一个很容易犯的错误就是忘了 basic_ack,后果很严重。消息在你的程序退出之后就会重新发送,如果它不能够释放没响应的消息,RabbitMQ 就会占用越来越多的内存。


为了排除这种错误,你可以使用 rabbitmqctl 命令,输出 messages_unacknowledged 字段:


$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
复制代码


在 window 系统运行,去掉 sudo:


$ rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
复制代码


3: rabbitmqctl 能够列出服务器上所有的交换器:


$ sudo rabbitmqctl list_exchanges
复制代码


这个列表中有一些叫做 amq.*的交换器。这些都是默认创建的,不过这时候你还不需要使用他们。


4:列出所有现存的绑定


rabbitmqctl list_bindings
复制代码


5: 如果你想把日志保存到文件中,只需要打开控制台输入: ( ​​receive_logs.php​​ 源代码)


$ php receive_logs.php > logs_from_rabbit.log
复制代码


如果你希望所有的日志信息都输出到屏幕中,打开一个新的终端,然后输入:


$ php receive_logs_direct.php info warning error# => [*] Waiting for logs. To exit press CTRL+C
复制代码


如果要触发一个 error 级别的日志,只需要输入:


$ php emit_log_direct.php error "Run. Run. Or it will explode."# => [x] Sent 'error':'Run. Run. Or it will explode.'
复制代码

队列安装

第一:安装 RabbitMq 环境

​windows环境的rabbitmq安装与启动​


​https://my.oschina.net/owenzhang24/blog/5051652​

第二:

composer require php-amqplib/php-amqplib
复制代码

第三:代码类

  1. rabbitMq 实现的基础类:application/common/lib/classes/rabbitmq/RabbitMq.php

  2. 供外部调用的 rabbitMq 类:application/common/lib/classes/RabbitMqWork.php

  3. 测试发送消息到 rabbitMq 中的方法:application/index/controller/Index.php

  4. 添加 php think 命令实现接收 rabbitMq 中的消息:application/common/command/*.php

第四:使用说明

  1. 发送消息时直接在自己的方法中调用 RabbitMqWork.php 类中的几个送消息的方法即可。

  2. application/common/command/下的类都是实现添加 php think 命令的类,在 configure 方法中的 setName()中设置命令名称,execute()方法是为了执行接收 rabbitMq 中的消息,同时在 application/command.php 中 return 添加设置的命令名称及对应的命令目录地址。

  3. 贡献文档

  4. ​RabbitMQ 中文文档-PHP版​​。​​https://xiaoxiami.gitbook.io/rabbitmq_into_chinese_php/​

  5. ​RabbitMQ官方文档​​。​​https://www.rabbitmq.com/getstarted.html​

第五:源码

application/common/lib/classes/rabbitmq/RabbitMq.php


<?php//rabbitMq实现的基础类
namespace app\common\lib\classes\rabbitmq;
use PhpAmqpLib\Connection\AMQPStreamConnection;use PhpAmqpLib\Message\AMQPMessage;
class RabbitMq{ static private $instance; static private $connection; static private $channel; const DIRECT = 'direct'; const TOPIC = 'topic'; const HEADERS = 'headers'; const FANOUT = 'fanout'; static private $exchangeNames = [ self::DIRECT => 'direct_exchange', self::TOPIC => 'topic_exchange', self::HEADERS => 'headers_exchange', self::FANOUT => 'fanout_exchange', ]; const SEVERITYS = [ 'info', 'warning', 'error' ]; static private $exchangeName = '';
/** * RabbitMq constructor. * @param $exchangeType */ private function __construct($exchangeType) { self::$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); self::$channel = self::$connection->channel(); if (!empty($exchangeType)) { self::$exchangeName = self::$exchangeNames[$exchangeType]; self::$channel->exchange_declare( self::$exchangeName, //交换机名称 $exchangeType, //路由类型 false, //don't check if a queue with the same name exists 是否检测同名队列 true, //the queue will not survive server restarts 是否开启队列持久化 false //the queue will be deleted once the channel is closed. 通道关闭后是否删除队列 ); } }
/** * 实例化 * @param string $exchangeType * @return RabbitMq */ public static function instance($exchangeType = '') { if (!self::$instance instanceof self) { self::$instance = new self($exchangeType); } return self::$instance; }
/** * 防止被外部复制 */ private function __clone() { }
/** * 简单的发送 */ public function send() { self::$channel->queue_declare('hello', false, false, false); $msg = new AMQPMessage('Hello World!'); self::$channel->basic_publish($msg, '', 'hello'); echo "[X] Sent 'Hello World!'\n"; }
/** * 简单的接收 * @param $queueName * @param $callback */ public function receive($callback) { self::$channel->queue_declare('hello', false, false, false, true); echo "[*] Waiting for messages. To exit press CTRL+C\n";
self::$channel->basic_consume('hello', '', false, true, false, false, $callback);
while (count(self::$channel->callbacks)) { self::$channel->wait(); } }
/** * 添加工作队列 * @param string $data */ public function addTask($data = '') { self::$channel->queue_declare('task_queue', false, true, false, true); if (empty($data)) $data = 'Hello World!'; $msg = new AMQPMessage( $data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); self::$channel->basic_publish($msg, '', 'task_queue');
echo "[x] Sent $data \n"; }
/** * 执行工作队列 * @param $callback */ public function workTask($callback) { self::$channel->queue_declare('task_queue', false, true, false, true); echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
self::$channel->basic_qos(null, 1, null); self::$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while (count(self::$channel->callbacks)) { self::$channel->wait(); } }
/** * 发布 * @param string $data */ public function sendQueue($data = '') { if (empty($data)) $data = 'info:Hello World!'; $msg = new AMQPMessage($data); self::$channel->basic_publish($msg, self::$exchangeName); echo "[x] Sent $data \n"; }
/** * 订阅 * @param $callback */ public function subscribeQueue($callback) { list($queue_name, ,) = self::$channel->queue_declare( "", //队列名称 false, //don't check if a queue with the same name exists 是否检测同名队列 true, //the queue will not survive server restarts 是否开启队列持久化 true, //the queue might be accessed by other channels 队列是否可以被其他队列访问 false //the queue will be deleted once the channel is closed. 通道关闭后是否删除队列 ); self::$channel->queue_bind($queue_name, self::$exchangeName); echo "[*] Waiting for logs. To exit press CTRL+C \n"; self::$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while (count(self::$channel->callbacks)) { self::$channel->wait(); } }
/** * 发送(直接交换机) * @param $routingKey * @param string $data */ public function sendDirect($routingKey, $data = '') { if (empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data); self::$channel->basic_publish($msg, self::$exchangeName, $routingKey); echo "[x] Sent $routingKey:$data \n"; }
/** * 接收(直接交换机) * @param \Closure $callback * @param array $bindingKeys */ public function receiveDirect(\Closure $callback, array $bindingKeys) { list($queue_namme, ,) = self::$channel->queue_declare('', false, true, true, false); foreach ($bindingKeys as $bindingKey) { self::$channel->queue_bind($queue_namme, self::$exchangeName, $bindingKey); } echo "[x] Waiting for logs. To exit press CTRL+C \n"; self::$channel->basic_consume($queue_namme, '', false, true, false, false, $callback); while (count(self::$channel->callbacks)) { self::$channel->wait(); } }
/** * 发送(主题交换机) * @param $routingKey * @param string $data */ public function sendTopic($routingKey, $data = '') { if (empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data); self::$channel->basic_publish($msg, self::$exchangeName, $routingKey); echo " [x] Sent ", $routingKey, ':', $data, " \n"; }
/** * 接收(主题交换机) * @param \Closure $callback * @param array $bindingKeys */ public function receiveTopic(\Closure $callback, array $bindingKeys) { list($queueName, ,) = self::$channel->queue_declare("", false, true, true, false); foreach ($bindingKeys as $bindingKey) { self::$channel->queue_bind($queueName, self::$exchangeName, $bindingKey); }
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; self::$channel->basic_consume($queueName, '', false, true, false, false, $callback);
while (count(self::$channel->callbacks)) { self::$channel->wait(); } }
/** * 销毁 */ public function __destruct() { // TODO: Implement __destruct() method. self::$channel->close(); self::$connection->close(); }}
复制代码


\


application/common/lib/classes/RabbitMqWork.php


<?php//供外部调用的rabbitMq类
namespace app\common\lib\classes;
use app\common\lib\classes\rabbitmq\RabbitMq;
class RabbitMqWork{ private $RabbitMq;
public function __construct($exchageType = '') { $this->RabbitMq = RabbitMq::instance($exchageType); }
/** * 发送(普通) */ public function send() { $this->RabbitMq->send(); }
/** * 接收(普通) * @param $callback */ public function receive($callback) { $this->RabbitMq->receive($callback); }
/** * 发送(工作队列) * @param $data */ public function addTask($data) { $this->RabbitMq->addTask($data); }
/** * 接收(工作队列) * @param $callback */ public function workTask($callback) { $this->RabbitMq->workTask($callback); }
/** * 发布(扇形交换机) * @param $data */ public function sendQueue($data) { $this->RabbitMq->sendQueue($data); }
/** * 订阅(扇形交换机) * @param $callback */ public function subscribeQueue($callback) { $this->RabbitMq->subscribeQueue($callback); }
/** * 发送(直接交换机) * @param $bindingKey * @param $data */ public function sendDirect($routingKey, $data) { $this->RabbitMq->sendDirect($routingKey, $data); }
/** * 接收(直接交换机) * @param \Closure $callback * @param array $bindingKeys */ public function receiveDirect(\Closure $callback, array $bindingKeys) { $this->RabbitMq->receiveDirect($callback, $bindingKeys); }
/** * 发送(主题交换机) * @param $routingKey * @param $data */ public function sendTopic($routingKey, $data) { $this->RabbitMq->sendTopic($routingKey, $data); }
/** * 接收(主题交换机) * @param \Closure $callback * @param array $bindingKeys */ public function receiveTopic(\Closure $callback, array $bindingKeys) { $this->RabbitMq->receiveTopic($callback, $bindingKeys); }}
复制代码


\


application/index/controller/Index.php


<?php
namespace app\index\controller;
use app\common\lib\classes\rabbitmq\RabbitMq;use app\common\lib\classes\RabbitMqWork;use app\polymerize\tool\module\es\SearchBlog;use app\polymerize\tool\module\es\SyncBlog;use think\Collection;
class Index extends Collection{ public function index() {// $this->send();// $this->addTask();// $this->sendQueue();// $this->sendDirect(); $this->sendTopic(); var_dump(11); die(); } public function searchBlog() {// $id=1;// $res = SyncBlog::getInstance()->syncBlog($id); $search='11'; $res = SearchBlog::getInstance()->searchBlog($search, 1, 100); var_dump($res); die(); var_dump(1111); die(); }
/** * 发送(普通) */ public function send() { $RabbitMqWork = new RabbitMqWork(); $RabbitMqWork->send(); }
/** * 发送(工作队列) */ public function addTask() { $data = input('data', 'This is work task!'); $RabbitMqWork = new RabbitMqWork(); $RabbitMqWork->addTask($data); }
/** * 发送(扇形交换机) */ public function sendQueue() { $data = input('data', 'This is send queue1'); $RabbitMqWork = new RabbitMqWork(RabbitMq::FANOUT); $RabbitMqWork->sendQueue($data); }
/** * 发送(直接交换机) */ public function sendDirect() { $data = input('data', 'Hello World!'); $routingKey = input('routingKey', 'info'); $RabbitMqWork = new RabbitMqWork(RabbitMq::DIRECT); $RabbitMqWork->sendDirect($routingKey, $data); }
/** * 发送(主题交换机) */ public function sendTopic() { $data = input('data', 'Hello World!'); $routingKey = input('routingKey', 'lazy.boy'); $RabbitMqWork = new RabbitMqWork(RabbitMq::TOPIC); $RabbitMqWork->sendTopic($routingKey, $data); }}
复制代码


application/command.php


<?php// +----------------------------------------------------------------------// | ThinkPHP [ WE CAN DO IT JUST THINK ]// +----------------------------------------------------------------------// | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved.// +----------------------------------------------------------------------// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )// +----------------------------------------------------------------------// | Author: yunwuxin <448901948@qq.com>// +----------------------------------------------------------------------
return [ 'simpleMq' => 'application\command\SimpleWork', 'workQueue' => 'application\command\WorkQueue', 'sendQueue' => 'application\command\SendQueue', 'directQueue' => 'application\command\DirectQueue', 'topicQueue' => 'application\command\TopicQueue',];
复制代码


application/common/command/*.php


application/command/DirectQueue.php


<?php/** * 接收(直接交换机) * @param \Closure $callback * @param array $bindingKeys */
namespace app\command;
use app\common\lib\classes\rabbitmq\RabbitMq;use app\common\lib\classes\RabbitMqWork;use think\console\Command;use think\console\Input;use think\console\Output;
class DirectQueue extends Command{ protected function configure() { parent::configure(); // TODO: Change the autogenerated stub $this->setName('directQueue'); }
protected function execute(Input $input, Output $output) { $RabbitMqWork = new RabbitMqWork(RabbitMq::DIRECT); $callback = function ($msg){ echo "[x] ".$msg->delivery_info['routing_key'].":$msg->body \n"; }; $RabbitMqWork->receiveDirect($callback,RabbitMq::SEVERITYS); }}
复制代码


application/command/SendQueue.php


<?php/** * 订阅(扇形交换机) * @param $callback */
namespace app\command;
use app\common\lib\classes\rabbitmq\RabbitMq;use app\common\lib\classes\RabbitMqWork;use think\console\Command;use think\console\Input;use think\console\Output;use think\Log;
class SendQueue extends Command{ protected function configure() { parent::configure(); // TODO: Change the autogenerated stub $this->setName('sendQueue'); }
protected function execute(Input $input, Output $output) { $RabbitMqWork = new RabbitMqWork(RabbitMq::FANOUT); $callback = function ($msg) { echo 'Receive:'; echo "Msg:$msg->body \n"; \Log::error("Msg:$msg->body"); }; $RabbitMqWork->subscribeQueue($callback); }}
复制代码


application/command/SimpleWork.php


<?php/** * 接收(普通) * @param $callback */
namespace app\command;
use app\common\lib\classes\RabbitMqWork;use think\console\Command;use think\console\Input;use think\console\Output;use think\Log;
class SimpleWork extends Command{ protected function configure() { parent::configure(); // TODO: Change the autogenerated stub $this->setName('simpleMq'); }
protected function execute(Input $input, Output $output) { $RabbitMqWork= new RabbitMqWork(); $callback = function ($msg){ echo 'Receive:'; $queueName = $msg->delivery_info['routing_key']; $msgData = $msg->body; $isAck = true; echo 'Msg:'.$msgData."\n"; echo 'QueueName:'.$queueName."\n"; if($isAck) { $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } }; $RabbitMqWork->receive($callback); }}
复制代码


application/command/TopicQueue.php


<?php/** * 接收(主题交换机) * @param \Closure $callback * @param array $bindingKeys */
namespace app\command;
use app\common\lib\classes\rabbitmq\RabbitMq;use app\common\lib\classes\RabbitMqWork;use think\console\Command;use think\console\Input;use think\console\Output;
class TopicQueue extends Command{ protected function configure() { parent::configure(); // TODO: Change the autogenerated stub $this->setName('topicQueue'); }
protected function execute(Input $input, Output $output) { $RabbitMqWork = new RabbitMqWork(RabbitMq::TOPIC); $callback = function ($msg){ echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n"; }; $bindingKeys = [ '*.orange.*', '*.*.rabbit', 'lazy.#' ]; $RabbitMqWork->receiveTopic($callback,$bindingKeys); }}
复制代码


application/command/WorkQueue.php


<?php/** * 接收(工作队列) * @param $callback */
namespace app\command;
use app\common\lib\classes\RabbitMqWork;use think\console\Command;use think\console\Input;use think\console\Output;
class WorkQueue extends Command{ protected function configure() { parent::configure(); // TODO: Change the autogenerated stub $this->setName('workQueue'); }
protected function execute(Input $input, Output $output) { $RabbitMqWork = new RabbitMqWork(); $callback = function ($msg){ echo " [x] Received ", $msg->body, "\n"; sleep(substr_count($msg->body, '.')); echo " [x] Done", "\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $RabbitMqWork->workTask($callback); }}
复制代码

Buy me a cup of coffee :)

觉得对你有帮助,就给我打赏吧,谢谢!


微信赞赏码链接,点击跳转:


https://www.owenzhang.com/wechat_reward.png

发布于: 刚刚阅读数: 2
用户头像

Owen Zhang

关注

还未添加个人签名 2020.05.10 加入

I actually hate programming, but I love solving problems. Phper & Gopher. https://gitee.com/owenzhang24 Email: owen@owenzhang.com

评论

发布
暂无评论
使用php-amqplib实现RabbitMq