<?php
namespace rabbitmq;
class Amq{    /**     * @var object 对象实例     */    protected static $instance;
    protected $exchange='router_visit';  // 交换机(需要在队列中绑定)    protected $queue ='visit_log';       // 队列    protected $route ='router_visit';    // 路由key(需要在队列中绑定)    protected $consumer_tag='consumer';    protected $config = [        'host' => '146.53.206.264',        'port' => 5672,        'login' => 'guest',		//guest        'password' => 'guest',	//Na18gR@9tf        'vhost' => 'log',        'amqp_debug' => true    ];    protected $exchange_index = 0;    protected $exchange_type = [        'direct',        'fanout',        'topic',        'headers'    ];
    /**     * @note 实例化     * @author: beiqiaosu     * @since: 2019/11/13 16:10     */    public static function getInstance()    {        if (!(self::$instance instanceof self)) {            self::$instance = new self();        }        return self::$instance;    }
	    /**     * @Notes: 消息生产     */    public function publisher($message,$config=[]) {        //如果有配置就用新配置        $this->config ['vhost'] = $config['vhost']?? $this->config ['vhost'];        $this->exchange = $config['exchange']?? $this->exchange;        $this->queue = $config['queue']?? $this->queue;
        $this->consumer_tag = $config['consumer_tag']?? $this->consumer_tag;        $this->route = $config['route']?? $this->route;        $this->exchange_index = $config['exchange_index']?? $this->exchange_index;
        $cnn = new \AMQPConnection($this->config);        if (!$cnn->connect()) {            echo "Cannot connect to the broker";            exit();        }
        $channel = new \AMQPChannel($cnn);        $ex = new \AMQPExchange($channel);        $ex->setName($this->exchange);
        $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
        $ex->setFlags(AMQP_DURABLE); //持久化        $ex->declareExchange();        return "Send Message:".$ex->publish($message, $this->route, AMQP_NOPARAM, array('delivery_mode' => 2))."\n";    }
    /**     * @note 消费     * @author: tata     * @since: 2019/11/13 16:10     */    public function consumer() {
        $exchange='router_visit';       //交换机        $queue ='visit_log';         //队列        $route ='router_visit';        //路由
        //连接broker        $cnn = new \AMQPConnection($this->config);        if (!$cnn->connect()) {            echo "Cannot connect to the broker";            exit();        }        $channel = new \AMQPChannel($cnn);        $ex = new \AMQPExchange($channel);        //设置交换机名称        $ex->setName($exchange);        //设置交换机类型        //AMQP_EX_TYPE_DIRECT:直连交换机        //AMQP_EX_TYPE_FANOUT:扇形交换机        //AMQP_EX_TYPE_HEADERS:头交换机        //AMQP_EX_TYPE_TOPIC:主题交换机        $ex->setType(AMQP_EX_TYPE_DIRECT);        //设置交换机持久        $ex->setFlags(AMQP_DURABLE);        //声明交换机        $ex->declareExchange();        //创建一个消息队列        $q = new \AMQPQueue($channel);        //设置队列名称        $q->setName($queue);        //设置队列持久        $q->setFlags(AMQP_DURABLE);        //声明消息队列        //$q->declareQueue();        //交换机和队列通过$route进行绑定        $q->bind($exchange, $route);
        $ret = $q->consume(function($envelope, $queue) {
            // 取出消息主题转为数组//            $origin_data = json_decode($envelope->getBody(),true);//            dump($envelope->getBody());die;
            /**对消息主题执行业务**/            $res = true;            /**对消息主题执行业务**/
            // 业务处理完毕发送给MQ消费掉该消息            if ($res) $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答        });
        dump($ret);die;
        $cnn->disconnect();    }
}
评论