写点什么

宝塔中极速安装的 PHP 如何使用 AMQP 连接 RabbitMQ

作者:北桥苏
  • 2023-05-12
    广东
  • 本文字数:3217 字

    阅读完需:约 11 分钟

前言:

有些人为了让项目快速上线,服务器往往安装宝塔面板,然后再极速安装 LNMP。尽管环境搭建的时间省了,但是宝塔上 PHP 中扩展包没有提供 AMQP。这时候只是为了使用消息队列而对 PHP 大动干戈, 不如使用一个 PHP AMQP 的库,即用即装,不对环境造成影响。

简介:

php-amqplib 客户端库,通过 composer 安装,不需要在 PHP 中安装扩展,以下为两种不同的安装方式。


\1. 项目中新建 composer.json,添加如下代码,然后 composer install


{    "require": {        "php-amqplib/php-amqplib": " 2.6.*"    }}
复制代码


\2. 命令进入到项目,然后 composer require php-amqplib/php-amqplib 2.6.*



RabbitMQ 设置:

\1. 进入 web 管控台,添加新用户,角色管理员,任何 IP 上都可以登录,授权指定虚拟机。



\2. 添加交换机



\3. 添加队列并与交互机绑定。



编码:

\1. 封装 rabbitMQ 类。


<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;use PhpAmqpLib\Message\AMQPMessage;
/** * Class RabbitMQ. */class RabbitMQ{ const READ_LINE_NUMBER = 0; const READ_LENGTH = 1; const READ_DATA = 2;
public $config;
public static $prefix = 'autoinc_key:'; protected $exchangeName = 'flow'; protected $queueName = 'flow_queue';
/** * @var \PhpAmqpLib\Connection\AMQPStreamConnection */ protected $connection; /** * @var \PhpAmqpLib\Channel\AMQPChannel */ protected $channel; protected $queue; //配置项 private $host; private $port; private $user; private $pass; private $vhost;
public function __construct($config = []) { //$this->config = $config;
//设置rabbitmq配置值 $this->host = '192.168.1.101'; $this->port = 5672; $this->user = 'beiqiaosu'; $this->pass = 'beiqiaosu'; $this->vhost = 'report';
$this->connect(); }
public function __call($method, $args = []) { $reConnect = false; while (1) { try { $this->initChannel(); $result = call_user_func_array([$this->channel, $method], $args); } catch (\Exception $e) { //已重连过,仍然报错 if ($reConnect) { throw $e; }
\Swoole::$php->log->error(__CLASS__ . ' [' . posix_getpid() . "] Swoole RabbitMQ[{$this->config['host']}:{$this->config['port']}] Exception(Msg=" . $e->getMessage() . ', Code=' . $e->getCode() . "), RabbitMQ->{$method}, Params=" . var_export($args, 1)); if ($this->connection) { $this->close(); }
$this->connect();
$reConnect = true; continue; }
return $result; } //不可能到这里 return false; }
/** * 连接rabbitmq消息队列. * * @return bool */ public function connect() { try { if ($this->connection) { unset($this->connection); } $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass, $this->vhost); } catch (\Exception $e) { echo __CLASS__ ."Swoole RabbitMQ Exception'".$e->getMessage(); return false; } }
/** * 关闭连接. */ public function close() { $this->channel->close(); $this->connection->close(); }
/** * 设置交换机名称. * * @param string $exchangeName */ public function setExchangeName($exchangeName = '') { $exchangeName && $this->exchangeName = $exchangeName; }
/** * 设置队列名称. * * @param string $queueName */ public function setQueueName($queueName = '') { $queueName && $this->queueName = $queueName; }
/** * 设置频道. */ public function initChannel() { if (!$this->channel) { //通道 $this->channel = $this->connection->channel(); $this->channel->queue_declare($this->queueName, false, true, false, false); $this->channel->exchange_declare($this->exchangeName, 'direct', false, true, false); $this->channel->queue_bind($this->queueName, $this->exchangeName); } }
/** * 获取队列数据. * * @return mixed */ public function pop() { while (1) { try { $this->connect(); $this->initChannel(); $message = $this->channel->basic_get($this->queueName); if ($message) { $this->channel->basic_ack($message->delivery_info['delivery_tag']); $result = $message->body; } else { throw new \Exception('Empty Queue Data'); } } catch (\Exception $e) { //\Swoole::$php->log->error(__CLASS__ . " [" . posix_getpid() . "] Swoole RabbitMQ[{$this->config['host']}:{$this->config['port']}] Exception(Msg=" . $e->getMessage() . ", Code=" . $e->getCode() . ")"); sleep(1); continue; }
return $result; } //不可能到这里 return false; }
/** * 插入队列数据. * * @param $data * * @return bool */ public function push($data) { while (1) { try { $this->connect(); $this->initChannel(); $message = new AMQPMessage($data, ['content_type'=>'text/plain', 'devlivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); $this->channel->basic_publish($message, $this->exchangeName); } catch (\Exception $e) { echo "$e->getMessage()"; continue; }
return true; } //不可能到这里 return false; }}
复制代码


\2. 操作 mq,出队,入队。


<?php
require_once "vendor/autoload.php";require_once "component/RabbitMQ.php";
$mq = new RabbitMQ();
// 消息消费测试/*try { $res = $mq->pop(); }catch(\Exception $e) { var_dump($e->getMessage());die;}*/

// 消息生产测试try { $res = $mq->push(json_encode(['name'=>'beiqiaosu','order_id'=>'2020070115261425155'])); }catch(\Exception $e) { var_dump($e->getMessage());die;}

var_dump($res);die;
复制代码

测试:

\1. 先通过生产消息(入队)方法运行一下,然后进入队列中 get message 查看消息总数。



\2. 测试调用消费,再查看总数。






用户头像

北桥苏

关注

公众号:ZERO开发 2023-05-08 加入

专注后端实战技术分享,不限于PHP,Python,JavaScript, Java等语言,致力于给猿友们提供有价值,有干货的内容。

评论

发布
暂无评论
宝塔中极速安装的PHP如何使用AMQP连接RabbitMQ_RabbitMQ_北桥苏_InfoQ写作社区