本篇文章给大家带来的内容是关于PHP和RabbitMQ实现消息队列的完整代码,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。

先安装PHP对应的RabbitMQ,这里用的是 php_amqp 不同的扩展实现方式会有细微的差异.
php扩展地址: http://pecl.php.net/package/amqp
具体以官网为准 http://www.rabbitmq.com/getstarted.html

介绍

config.php 配置信息
BaseMQ.php MQ基类
ProductMQ.php 生产者类
ConsumerMQ.php 消费者类
Consumer2MQ.php 消费者2(可有多个)

config.php

<?php
return [
//配置
'host' => [
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost'=>'/',
],
//交换机
'exchange'=>'word',
//路由
'routes' => [],
];

  BaseMQ.php

<?php
/**
* Created by PhpStorm.
* User: pc
* Date: 2018/12/13
* Time: 14:11
*/ namespace MyObjSummary\rabbitMQ; /** Member
* AMQPChannel
* AMQPConnection
* AMQPEnvelope
* AMQPExchange
* AMQPQueue
* Class BaseMQ
* @package MyObjSummary\rabbitMQ
*/
class BaseMQ
{
/** MQ Channel
* @var \AMQPChannel
*/
public $AMQPChannel ; /** MQ Link
* @var \AMQPConnection
*/
public $AMQPConnection ; /** MQ Envelope
* @var \AMQPEnvelope
*/
public $AMQPEnvelope ; /** MQ Exchange
* @var \AMQPExchange
*/
public $AMQPExchange ; /** MQ Queue
* @var \AMQPQueue
*/
public $AMQPQueue ; /** conf
* @var
*/
public $conf ; /** exchange
* @var
*/
public $exchange ; /** link
* BaseMQ constructor.
* @throws \AMQPConnectionException
*/
public function __construct()
{
$conf = require 'config.php' ;
if(!$conf)
throw new \AMQPConnectionException('config error!');
$this->conf = $conf['host'] ;
$this->exchange = $conf['exchange'] ;
$this->AMQPConnection = new \AMQPConnection($this->conf);
if (!$this->AMQPConnection->connect())
throw new \AMQPConnectionException("Cannot connect to the broker!\n");
} /**
* close link
*/
public function close()
{
$this->AMQPConnection->disconnect();
} /** Channel
* @return \AMQPChannel
* @throws \AMQPConnectionException
*/
public function channel()
{
if(!$this->AMQPChannel) {
$this->AMQPChannel = new \AMQPChannel($this->AMQPConnection);
}
return $this->AMQPChannel;
} /** Exchange
* @return \AMQPExchange
* @throws \AMQPConnectionException
* @throws \AMQPExchangeException
*/
public function exchange()
{
if(!$this->AMQPExchange) {
$this->AMQPExchange = new \AMQPExchange($this->channel());
$this->AMQPExchange->setName($this->exchange);
}
return $this->AMQPExchange ;
} /** queue
* @return \AMQPQueue
* @throws \AMQPConnectionException
* @throws \AMQPQueueException
*/
public function queue()
{
if(!$this->AMQPQueue) {
$this->AMQPQueue = new \AMQPQueue($this->channel());
}
return $this->AMQPQueue ;
} /** Envelope
* @return \AMQPEnvelope
*/
public function envelope()
{
if(!$this->AMQPEnvelope) {
$this->AMQPEnvelope = new \AMQPEnvelope();
}
return $this->AMQPEnvelope;
}
}

 ProductMQ.php

<?php
//生产者 P
namespace MyObjSummary\rabbitMQ;
require 'BaseMQ.php';
class ProductMQ extends BaseMQ
{
private $routes = ['hello','word']; //路由key /**
* ProductMQ constructor.
* @throws \AMQPConnectionException
*/
public function __construct()
{
parent::__construct();
} /** 只控制发送成功 不接受消费者是否收到
* @throws \AMQPChannelException
* @throws \AMQPConnectionException
* @throws \AMQPExchangeException
*/
public function run()
{
//频道
$channel = $this->channel();
//创建交换机对象
$ex = $this->exchange();
//消息内容
$message = 'product message '.rand(,);
//开始事务
$channel->startTransaction();
$sendEd = true ;
foreach ($this->routes as $route) {
$sendEd = $ex->publish($message, $route) ;
echo "Send Message:".$sendEd."\n";
}
if(!$sendEd) {
$channel->rollbackTransaction();
}
$channel->commitTransaction(); //提交事务
$this->close();
die ;
}
}
try{
(new ProductMQ())->run();
}catch (\Exception $exception){
var_dump($exception->getMessage()) ;
}

ConsumerMQ.php

<?php
//消费者 C
namespace MyObjSummary\rabbitMQ;
require 'BaseMQ.php';
class ConsumerMQ extends BaseMQ
{
private $q_name = 'hello'; //队列名
private $route = 'hello'; //路由key /**
* ConsumerMQ constructor.
* @throws \AMQPConnectionException
*/
public function __construct()
{
parent::__construct();
} /** 接受消息 如果终止 重连时会有消息
* @throws \AMQPChannelException
* @throws \AMQPConnectionException
* @throws \AMQPExchangeException
* @throws \AMQPQueueException
*/
public function run()
{ //创建交换机
$ex = $this->exchange();
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$ex->setFlags(AMQP_DURABLE); //持久化
//echo "Exchange Status:".$ex->declare()."\n"; //创建队列
$q = $this->queue();
//var_dump($q->declare());exit();
$q->setName($this->q_name);
$q->setFlags(AMQP_DURABLE); //持久化
//echo "Message Total:".$q->declareQueue()."\n"; //绑定交换机与队列,并指定路由键
echo 'Queue Bind: '.$q->bind($this->exchange, $this->route)."\n"; //阻塞模式接收消息
echo "Message:\n";
while(True){
$q->consume(function ($envelope,$queue){
$msg = $envelope->getBody();
echo $msg."\n"; //处理消息
$queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
});
//$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
}
$this->close();
}
}
try{
(new ConsumerMQ)->run();
}catch (\Exception $exception){
var_dump($exception->getMessage()) ;
}

 

最新文章

  1. Python基本数据类型
  2. windows 安装MySql
  3. 基于.NET的CAD二次开发学习笔记一:CAD开发入门
  4. POJ1426 Find The Multiple (宽搜思想)
  5. C#控制其它程序
  6. 六个字符,带你领略JavaScript (js的艺术编写)
  7. 简化LINUX的命令输入 简化linux命令
  8. 【转】【C#】无边框窗体移动的三种方法
  9. windows server 2003 AD
  10. C# 类型的创建
  11. 玩转docker镜像和镜像构建
  12. python 爬照片 模拟浏览器 先登录账号
  13. java学习(二)多态中成员变量详解
  14. 对于JavaScript中this关键字的理解
  15. LARS 最小角回归算法简介
  16. SQL 2016 正式版 安装过程
  17. python -- ajax数组传递和后台接收
  18. Android 绘图时实现双缓冲
  19. iPhone开发过程中调试多次Release问题 message sent to deallocated
  20. Educational Codeforces Round 13 B. The Same Calendar 水题

热门文章

  1. Git版本库管理
  2. Gradient Boosting, Decision Trees and XGBoost with CUDA ——GPU加速5-6倍
  3. vue实现动态异步组件
  4. PHP生成小程序二维码合成图片生成文字
  5. Linux c 获取cpu使用率(2)
  6. 初学web前端
  7. 建设工程造价数据服务云平台(计价BIM)
  8. idea使用错误及技巧总结合集(一)
  9. swiper使用中一些点的总结
  10. 如何用jmeter进行数据库性能测试