namespace rabbitmq;
class Amq
* @var object 对象实例
protected static $instance;
protected $exchange='router_visit'; // 交换机(需要在队列中绑定)
protected $queue ='visit_log'; // 队列
protected $route ='router_visit'; // 路由key(需要在队列中绑定)
protected $consumer_tag='consumer';
protected $config = [
'host' => '',
'port' => 5672,
'login' => 'guest', //guest
'password' => 'guest', //Na18gR@9tf
'vhost' => 'log',
'amqp_debug' => true
protected $exchange_index = 0;
protected $exchange_type = [
* @note 实例化
* @author: beiqiaosu
* @since: 2019/11/13 16:10
public static function getInstance()
if (!(self::$instance instanceof self)) {
self::$instance = new self();
return self::$instance;
* @Notes: 消息生产
public function publisher($message,$config=[]) {
$this->config ['vhost'] = $config['vhost']?? $this->config ['vhost'];
$this->exchange = $config['exchange']?? $this->exchange;
$this->queue = $config['queue']?? $this->queue;
$this->consumer_tag = $config['consumer_tag']?? $this->consumer_tag;
$this->route = $config['route']?? $this->route;
$this->exchange_index = $config['exchange_index']?? $this->exchange_index;
$cnn = new \AMQPConnection($this->config);
if (!$cnn->connect()) {
echo "Cannot connect to the broker";
$channel = new \AMQPChannel($cnn);
$ex = new \AMQPExchange($channel);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$ex->setFlags(AMQP_DURABLE); //持久化
return "Send Message:".$ex->publish($message, $this->route, AMQP_NOPARAM, array('delivery_mode' => 2))."\n";
* @note 消费
* @author: tata
* @since: 2019/11/13 16:10
public function consumer() {
$exchange='router_visit'; //交换机
$queue ='visit_log'; //队列
$route ='router_visit'; //路由
$cnn = new \AMQPConnection($this->config);
if (!$cnn->connect()) {
echo "Cannot connect to the broker";
$channel = new \AMQPChannel($cnn);
$ex = new \AMQPExchange($channel);
$q = new \AMQPQueue($channel);
$q->bind($exchange, $route);
$ret = $q->consume(function($envelope, $queue) {
// 取出消息主题转为数组
// $origin_data = json_decode($envelope->getBody(),true);
// dump($envelope->getBody());die;
$res = true;
// 业务处理完毕发送给MQ消费掉该消息
if ($res) $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答