消息持久化

前言


通过上一节,我们知道,有消息确认机制,保证了当消费者进程挂掉后,消息的不丢失。

但是如果rabbitmq挂掉呢?它的队列和消息都会丢失的。为了保证消息在rabbitmq挂掉重启后不丢失,

我们需要用到rabbitmq的持久化机制。

开启持久化功能


1. 首先保证queue的持久化,在publisher和consumer声明queue时,开启持久化功能。本章例子可以通过下面代码开启

//函数第三个参数置为true,代表开启队列的持久化
$channel->queue_declare('durable_queue', false, true, false, false);

2. 其次保证message的持久化,在publisher传递消息前开启消息的持久化功能。

$msg = new AMQPMessage($data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));

代码


sender.php

<?php
/**
* sender.php
* Created by PhpStorm.
* User: wangdaxi
* Date: 2017/10/18
* Time: 14:26
*/
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel(); $channel->queue_declare('durable_queue', false, true, false, false); $data = implode(" ", array_slice($argv, 1));
empty($data) && $data = "Hello World!"; $msg = new AMQPMessage($data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); $channel->basic_publish($msg, '', 'durable_queue'); echo " [x] Sent '$data'\n"; //close the channel and connection;
$channel->close();
$connection->close();

receive.php

<?php
/**
* receive.php
* Created by PhpStorm.
* User: wangdaxi
* Date: 2017/10/18
* Time: 14:34
*/
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel(); $channel->queue_declare('durable_queue', false, true, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n"; $callback = function($msg) {
echo "[x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo "[x] Done\n";
//消息确认
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_consume('durable_queue', '', false, false, false, false, $callback); while(count($channel->callbacks)) {
$channel->wait();
}

测试


1. php sender.php 这里开启了队列,并且发送了一条消息‘hello world’

2. sudo rabbitmqctl stop_app 关闭服务

3. sudo rabbitmqctl start_app 开启服务

4. php receive.php 开启消费者进程,发现能从队列【durable_queue】接收到消息。

以上。

最新文章

  1. WebApi官方系列
  2. Unity3D访问Android系统目录
  3. asp.net链接数据库问题,设置收藏本站,设置主页
  4. 使用Maven构建多模块项目
  5. SQL Server 查看物理页存储
  6. paper 25 :SVM支持向量机是什么意思?
  7. Full GC有关问题学习分析(转载)
  8. E437: terminal capability &quot;cm&quot; required
  9. LaTex 高中数学公式
  10. 开发ContentProvider的步骤
  11. 【转】QT QString, wchar_t *, TCHAR, CString和其他字符或字符串类型的转化
  12. MongoDB副本集的常用操作及原理
  13. POJ 3468 A Simple Problem with Integers(树状数组区间更新)
  14. net user命令详解
  15. visual studio各种新建项目和新建项简介
  16. [区块链] 密码学——Merkle 树
  17. Shell工具| 流程控制
  18. 高版本sonar安装遇到的坑-sonar 6.6
  19. gentoo raid1
  20. Windows上使用sqlite3

热门文章

  1. 可视化库-Matplotlib-盒图(第四天)
  2. tfrecord
  3. ansible 使用
  4. 打劫房屋 &#183; House Robber
  5. java代码实现网络远程开机
  6. Qt的安装和使用中的常见问题(详细版)
  7. QTcpSocket-Qt使用Tcp通讯实现服务端和客户端
  8. 一条java开发工程师的升级路线,从初级到无语言障碍
  9. spark配置文件和执行部分代码
  10. 树莓派3下安装TL-WN722N无线网卡驱动