使用 php-amqplib 实现 RabbitMq
- 2022 年 1 月 18 日
本文字数:9825 字
阅读完需:约 32 分钟
本文环境 Ubuntu 20.04,Nginx1.8,PHP7.3,RabbitMq3.9
不懂的可以评论或联系我邮箱:owen@owenzhang.com
著作权归 OwenZhang 所有。商业转载请联系 OwenZhang 获得授权,非商业转载请注明出处。
项目代码
https://gitee.com/owenzhang24/tp5
队列笔记
1: 列出队列(Listing queues)
如果你想查看 Rabbitmq 队列,并且想知道有多少消息存在其中,你(作为特权用户)可以使用 rabbitmqctl 工具:
rabbitmqctl list_queues。
在 Windows 中,省略 sudo:
rabbitmqctl.bat list_queues
2: 工作队列
我们发现即使使用 CTRL+C 杀掉了一个工作者(worker)进程,消息也不会丢失。当工作者(worker)挂掉这后,所有没有响应的消息都会重新发送。
一个很容易犯的错误就是忘了 basic_ack,后果很严重。消息在你的程序退出之后就会重新发送,如果它不能够释放没响应的消息,RabbitMQ 就会占用越来越多的内存。
为了排除这种错误,你可以使用 rabbitmqctl 命令,输出 messages_unacknowledged 字段:
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在 window 系统运行,去掉 sudo:
$ rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
3: rabbitmqctl 能够列出服务器上所有的交换器:
$ sudo rabbitmqctl list_exchanges
这个列表中有一些叫做 amq.*的交换器。这些都是默认创建的,不过这时候你还不需要使用他们。
4:列出所有现存的绑定
rabbitmqctl list_bindings
5: 如果你想把日志保存到文件中,只需要打开控制台输入: ( receive_logs.php 源代码)
$ php receive_logs.php > logs_from_rabbit.log
如果你希望所有的日志信息都输出到屏幕中,打开一个新的终端,然后输入:
$ php receive_logs_direct.php info warning error
# => [*] Waiting for logs. To exit press CTRL+C
如果要触发一个 error 级别的日志,只需要输入:
$ php emit_log_direct.php error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'
队列安装
第一:安装 RabbitMq 环境
https://my.oschina.net/owenzhang24/blog/5051652
第二:
composer require php-amqplib/php-amqplib
第三:代码类
rabbitMq 实现的基础类:application/common/lib/classes/rabbitmq/RabbitMq.php
供外部调用的 rabbitMq 类:application/common/lib/classes/RabbitMqWork.php
测试发送消息到 rabbitMq 中的方法:application/index/controller/Index.php
添加 php think 命令实现接收 rabbitMq 中的消息:application/common/command/*.php
第四:使用说明
发送消息时直接在自己的方法中调用 RabbitMqWork.php 类中的几个送消息的方法即可。
application/common/command/下的类都是实现添加 php think 命令的类,在 configure 方法中的 setName()中设置命令名称,execute()方法是为了执行接收 rabbitMq 中的消息,同时在 application/command.php 中 return 添加设置的命令名称及对应的命令目录地址。
贡献文档
RabbitMQ 中文文档-PHP版。https://xiaoxiami.gitbook.io/rabbitmq_into_chinese_php/
RabbitMQ官方文档。https://www.rabbitmq.com/getstarted.html
第五:源码
application/common/lib/classes/rabbitmq/RabbitMq.php
<?php
//rabbitMq实现的基础类
namespace app\common\lib\classes\rabbitmq;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitMq
{
static private $instance;
static private $connection;
static private $channel;
const DIRECT = 'direct';
const TOPIC = 'topic';
const HEADERS = 'headers';
const FANOUT = 'fanout';
static private $exchangeNames = [
self::DIRECT => 'direct_exchange',
self::TOPIC => 'topic_exchange',
self::HEADERS => 'headers_exchange',
self::FANOUT => 'fanout_exchange',
];
const SEVERITYS = [
'info',
'warning',
'error'
];
static private $exchangeName = '';
/**
* RabbitMq constructor.
* @param $exchangeType
*/
private function __construct($exchangeType)
{
self::$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
self::$channel = self::$connection->channel();
if (!empty($exchangeType)) {
self::$exchangeName = self::$exchangeNames[$exchangeType];
self::$channel->exchange_declare(
self::$exchangeName, //交换机名称
$exchangeType, //路由类型
false, //don't check if a queue with the same name exists 是否检测同名队列
true, //the queue will not survive server restarts 是否开启队列持久化
false //the queue will be deleted once the channel is closed. 通道关闭后是否删除队列
);
}
}
/**
* 实例化
* @param string $exchangeType
* @return RabbitMq
*/
public static function instance($exchangeType = '')
{
if (!self::$instance instanceof self) {
self::$instance = new self($exchangeType);
}
return self::$instance;
}
/**
* 防止被外部复制
*/
private function __clone()
{
}
/**
* 简单的发送
*/
public function send()
{
self::$channel->queue_declare('hello', false, false, false);
$msg = new AMQPMessage('Hello World!');
self::$channel->basic_publish($msg, '', 'hello');
echo "[X] Sent 'Hello World!'\n";
}
/**
* 简单的接收
* @param $queueName
* @param $callback
*/
public function receive($callback)
{
self::$channel->queue_declare('hello', false, false, false, true);
echo "[*] Waiting for messages. To exit press CTRL+C\n";
self::$channel->basic_consume('hello', '', false, true, false, false, $callback);
while (count(self::$channel->callbacks)) {
self::$channel->wait();
}
}
/**
* 添加工作队列
* @param string $data
*/
public function addTask($data = '')
{
self::$channel->queue_declare('task_queue', false, true, false, true);
if (empty($data)) $data = 'Hello World!';
$msg = new AMQPMessage(
$data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
self::$channel->basic_publish($msg, '', 'task_queue');
echo "[x] Sent $data \n";
}
/**
* 执行工作队列
* @param $callback
*/
public function workTask($callback)
{
self::$channel->queue_declare('task_queue', false, true, false, true);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
self::$channel->basic_qos(null, 1, null);
self::$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while (count(self::$channel->callbacks)) {
self::$channel->wait();
}
}
/**
* 发布
* @param string $data
*/
public function sendQueue($data = '')
{
if (empty($data)) $data = 'info:Hello World!';
$msg = new AMQPMessage($data);
self::$channel->basic_publish($msg, self::$exchangeName);
echo "[x] Sent $data \n";
}
/**
* 订阅
* @param $callback
*/
public function subscribeQueue($callback)
{
list($queue_name, ,) = self::$channel->queue_declare(
"", //队列名称
false, //don't check if a queue with the same name exists 是否检测同名队列
true, //the queue will not survive server restarts 是否开启队列持久化
true, //the queue might be accessed by other channels 队列是否可以被其他队列访问
false //the queue will be deleted once the channel is closed. 通道关闭后是否删除队列
);
self::$channel->queue_bind($queue_name, self::$exchangeName);
echo "[*] Waiting for logs. To exit press CTRL+C \n";
self::$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while (count(self::$channel->callbacks)) {
self::$channel->wait();
}
}
/**
* 发送(直接交换机)
* @param $routingKey
* @param string $data
*/
public function sendDirect($routingKey, $data = '')
{
if (empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data);
self::$channel->basic_publish($msg, self::$exchangeName, $routingKey);
echo "[x] Sent $routingKey:$data \n";
}
/**
* 接收(直接交换机)
* @param \Closure $callback
* @param array $bindingKeys
*/
public function receiveDirect(\Closure $callback, array $bindingKeys)
{
list($queue_namme, ,) = self::$channel->queue_declare('', false, true, true, false);
foreach ($bindingKeys as $bindingKey) {
self::$channel->queue_bind($queue_namme, self::$exchangeName, $bindingKey);
}
echo "[x] Waiting for logs. To exit press CTRL+C \n";
self::$channel->basic_consume($queue_namme, '', false, true, false, false, $callback);
while (count(self::$channel->callbacks)) {
self::$channel->wait();
}
}
/**
* 发送(主题交换机)
* @param $routingKey
* @param string $data
*/
public function sendTopic($routingKey, $data = '')
{
if (empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data);
self::$channel->basic_publish($msg, self::$exchangeName, $routingKey);
echo " [x] Sent ", $routingKey, ':', $data, " \n";
}
/**
* 接收(主题交换机)
* @param \Closure $callback
* @param array $bindingKeys
*/
public function receiveTopic(\Closure $callback, array $bindingKeys)
{
list($queueName, ,) = self::$channel->queue_declare("", false, true, true, false);
foreach ($bindingKeys as $bindingKey) {
self::$channel->queue_bind($queueName, self::$exchangeName, $bindingKey);
}
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
self::$channel->basic_consume($queueName, '', false, true, false, false, $callback);
while (count(self::$channel->callbacks)) {
self::$channel->wait();
}
}
/**
* 销毁
*/
public function __destruct()
{
// TODO: Implement __destruct() method.
self::$channel->close();
self::$connection->close();
}
}
\
application/common/lib/classes/RabbitMqWork.php
<?php
//供外部调用的rabbitMq类
namespace app\common\lib\classes;
use app\common\lib\classes\rabbitmq\RabbitMq;
class RabbitMqWork
{
private $RabbitMq;
public function __construct($exchageType = '')
{
$this->RabbitMq = RabbitMq::instance($exchageType);
}
/**
* 发送(普通)
*/
public function send()
{
$this->RabbitMq->send();
}
/**
* 接收(普通)
* @param $callback
*/
public function receive($callback)
{
$this->RabbitMq->receive($callback);
}
/**
* 发送(工作队列)
* @param $data
*/
public function addTask($data)
{
$this->RabbitMq->addTask($data);
}
/**
* 接收(工作队列)
* @param $callback
*/
public function workTask($callback)
{
$this->RabbitMq->workTask($callback);
}
/**
* 发布(扇形交换机)
* @param $data
*/
public function sendQueue($data)
{
$this->RabbitMq->sendQueue($data);
}
/**
* 订阅(扇形交换机)
* @param $callback
*/
public function subscribeQueue($callback)
{
$this->RabbitMq->subscribeQueue($callback);
}
/**
* 发送(直接交换机)
* @param $bindingKey
* @param $data
*/
public function sendDirect($routingKey, $data)
{
$this->RabbitMq->sendDirect($routingKey, $data);
}
/**
* 接收(直接交换机)
* @param \Closure $callback
* @param array $bindingKeys
*/
public function receiveDirect(\Closure $callback, array $bindingKeys)
{
$this->RabbitMq->receiveDirect($callback, $bindingKeys);
}
/**
* 发送(主题交换机)
* @param $routingKey
* @param $data
*/
public function sendTopic($routingKey, $data)
{
$this->RabbitMq->sendTopic($routingKey, $data);
}
/**
* 接收(主题交换机)
* @param \Closure $callback
* @param array $bindingKeys
*/
public function receiveTopic(\Closure $callback, array $bindingKeys)
{
$this->RabbitMq->receiveTopic($callback, $bindingKeys);
}
}
\
application/index/controller/Index.php
<?php
namespace app\index\controller;
use app\common\lib\classes\rabbitmq\RabbitMq;
use app\common\lib\classes\RabbitMqWork;
use app\polymerize\tool\module\es\SearchBlog;
use app\polymerize\tool\module\es\SyncBlog;
use think\Collection;
class Index extends Collection
{
public function index()
{
// $this->send();
// $this->addTask();
// $this->sendQueue();
// $this->sendDirect();
$this->sendTopic();
var_dump(11);
die();
}
public function searchBlog()
{
// $id=1;
// $res = SyncBlog::getInstance()->syncBlog($id);
$search='11';
$res = SearchBlog::getInstance()->searchBlog($search, 1, 100);
var_dump($res);
die();
var_dump(1111);
die();
}
/**
* 发送(普通)
*/
public function send()
{
$RabbitMqWork = new RabbitMqWork();
$RabbitMqWork->send();
}
/**
* 发送(工作队列)
*/
public function addTask()
{
$data = input('data', 'This is work task!');
$RabbitMqWork = new RabbitMqWork();
$RabbitMqWork->addTask($data);
}
/**
* 发送(扇形交换机)
*/
public function sendQueue()
{
$data = input('data', 'This is send queue1');
$RabbitMqWork = new RabbitMqWork(RabbitMq::FANOUT);
$RabbitMqWork->sendQueue($data);
}
/**
* 发送(直接交换机)
*/
public function sendDirect()
{
$data = input('data', 'Hello World!');
$routingKey = input('routingKey', 'info');
$RabbitMqWork = new RabbitMqWork(RabbitMq::DIRECT);
$RabbitMqWork->sendDirect($routingKey, $data);
}
/**
* 发送(主题交换机)
*/
public function sendTopic()
{
$data = input('data', 'Hello World!');
$routingKey = input('routingKey', 'lazy.boy');
$RabbitMqWork = new RabbitMqWork(RabbitMq::TOPIC);
$RabbitMqWork->sendTopic($routingKey, $data);
}
}
application/command.php
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
return [
'simpleMq' => 'application\command\SimpleWork',
'workQueue' => 'application\command\WorkQueue',
'sendQueue' => 'application\command\SendQueue',
'directQueue' => 'application\command\DirectQueue',
'topicQueue' => 'application\command\TopicQueue',
];
application/common/command/*.php
application/command/DirectQueue.php
<?php
/**
* 接收(直接交换机)
* @param \Closure $callback
* @param array $bindingKeys
*/
namespace app\command;
use app\common\lib\classes\rabbitmq\RabbitMq;
use app\common\lib\classes\RabbitMqWork;
use think\console\Command;
use think\console\Input;
use think\console\Output;
class DirectQueue extends Command
{
protected function configure()
{
parent::configure(); // TODO: Change the autogenerated stub
$this->setName('directQueue');
}
protected function execute(Input $input, Output $output)
{
$RabbitMqWork = new RabbitMqWork(RabbitMq::DIRECT);
$callback = function ($msg){
echo "[x] ".$msg->delivery_info['routing_key'].":$msg->body \n";
};
$RabbitMqWork->receiveDirect($callback,RabbitMq::SEVERITYS);
}
}
application/command/SendQueue.php
<?php
/**
* 订阅(扇形交换机)
* @param $callback
*/
namespace app\command;
use app\common\lib\classes\rabbitmq\RabbitMq;
use app\common\lib\classes\RabbitMqWork;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\Log;
class SendQueue extends Command
{
protected function configure()
{
parent::configure(); // TODO: Change the autogenerated stub
$this->setName('sendQueue');
}
protected function execute(Input $input, Output $output)
{
$RabbitMqWork = new RabbitMqWork(RabbitMq::FANOUT);
$callback = function ($msg) {
echo 'Receive:';
echo "Msg:$msg->body \n";
\Log::error("Msg:$msg->body");
};
$RabbitMqWork->subscribeQueue($callback);
}
}
application/command/SimpleWork.php
<?php
/**
* 接收(普通)
* @param $callback
*/
namespace app\command;
use app\common\lib\classes\RabbitMqWork;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\Log;
class SimpleWork extends Command
{
protected function configure()
{
parent::configure(); // TODO: Change the autogenerated stub
$this->setName('simpleMq');
}
protected function execute(Input $input, Output $output)
{
$RabbitMqWork= new RabbitMqWork();
$callback = function ($msg){
echo 'Receive:';
$queueName = $msg->delivery_info['routing_key'];
$msgData = $msg->body;
$isAck = true;
echo 'Msg:'.$msgData."\n";
echo 'QueueName:'.$queueName."\n";
if($isAck) {
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
};
$RabbitMqWork->receive($callback);
}
}
application/command/TopicQueue.php
<?php
/**
* 接收(主题交换机)
* @param \Closure $callback
* @param array $bindingKeys
*/
namespace app\command;
use app\common\lib\classes\rabbitmq\RabbitMq;
use app\common\lib\classes\RabbitMqWork;
use think\console\Command;
use think\console\Input;
use think\console\Output;
class TopicQueue extends Command
{
protected function configure()
{
parent::configure(); // TODO: Change the autogenerated stub
$this->setName('topicQueue');
}
protected function execute(Input $input, Output $output)
{
$RabbitMqWork = new RabbitMqWork(RabbitMq::TOPIC);
$callback = function ($msg){
echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};
$bindingKeys = [
'*.orange.*',
'*.*.rabbit',
'lazy.#'
];
$RabbitMqWork->receiveTopic($callback,$bindingKeys);
}
}
application/command/WorkQueue.php
<?php
/**
* 接收(工作队列)
* @param $callback
*/
namespace app\command;
use app\common\lib\classes\RabbitMqWork;
use think\console\Command;
use think\console\Input;
use think\console\Output;
class WorkQueue extends Command
{
protected function configure()
{
parent::configure(); // TODO: Change the autogenerated stub
$this->setName('workQueue');
}
protected function execute(Input $input, Output $output)
{
$RabbitMqWork = new RabbitMqWork();
$callback = function ($msg){
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$RabbitMqWork->workTask($callback);
}
}
Buy me a cup of coffee :)
觉得对你有帮助,就给我打赏吧,谢谢!
微信赞赏码链接,点击跳转:
版权声明: 本文为 InfoQ 作者【Owen Zhang】的原创文章。
原文链接:【http://xie.infoq.cn/article/65cc7e958b5a6e257139a595e】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
Owen Zhang
还未添加个人签名 2020.05.10 加入
I actually hate programming, but I love solving problems. Phper & Gopher. https://gitee.com/owenzhang24 Email: owen@owenzhang.com
评论