博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
PHP使用RabbitMQ实例
阅读量:6302 次
发布时间:2019-06-22

本文共 2851 字,大约阅读时间需要 9 分钟。

相关博文:

RabbitMQ的安装过程,工作流程,和一些基础概念已经在前面的笔记中提到了,今天在本地实现了php连接RabbitMQ,以及消息的生产和消费的过程,首先看下没有生产者和消费者的默认RabbitMQ管理界面截图:
Connections:
20190115221026.png
还没有任何连接(Connections)
Channels:
20190115221114.png
还没有任何通道(Channels)
Exchanges:
20190115221142.png
交换机只有系统默认的
Queues:
20190115221201.png
还没有任何队列
先上消费者代码consumer.php

'192.168.75.132', 'vhost' => '/', 'port' => 5672, 'login' => 'test', 'password' => 'test');//连接broker$cnn = new AMQPConnection($config);if (!$cnn->connect()) { echo "Cannot connect to the broker"; exit();}//在连接内创建一个通道$ch = new AMQPChannel($cnn);//创建一个交换机$ex = new AMQPExchange($ch);//声明路由键$routingKey = 'key_1';//声明交换机名称$exchangeName = 'exchange_1';//设置交换机名称$ex->setName($exchangeName);//设置交换机类型//AMQP_EX_TYPE_DIRECT:直连交换机//AMQP_EX_TYPE_FANOUT:扇形交换机//AMQP_EX_TYPE_HEADERS:头交换机//AMQP_EX_TYPE_TOPIC:主题交换机$ex->setType(AMQP_EX_TYPE_DIRECT);//设置交换机持久$ex->setFlags(AMQP_DURABLE);//声明交换机$ex->declareExchange();//创建一个消息队列$q = new AMQPQueue($ch);//设置队列名称$q->setName('queue_1');//设置队列持久$q->setFlags(AMQP_DURABLE);//声明消息队列$q->declareQueue();//交换机和队列通过$routingKey进行绑定$q->bind($ex->getName(), $routingKey);//接收消息并进行处理的回调方法function receive($envelope, $queue) { //休眠两秒, sleep(2); //echo消息内容 echo $envelope->getBody()."\n"; //显式确认,队列收到消费者显式确认后,会删除该消息 $queue->ack($envelope->getDeliveryTag());}//设置消息队列消费者回调方法,并进行阻塞$q->consume("receive");//$q->consume("receive", AMQP_AUTOACK);//隐式确认,不推荐

以上是消费者代码,打开两个命令行/终端

输入php consumer.php,消费者开始阻塞获取消息,如下图
20190115223725.png
此时再看RabbitMQ管理界面:
Connections
20190115223935.png
出现两个连接,这两个就是消费者,因为他们在阻塞着等待消息
Channels
20190115224138.png
消费者在各自的连接里都打开了一个通道
Exchanges
20190115224256.png
其中一个消费者创建了一个持久的直连交换机
Queues
20190115224418.png
消息队列已经创建,但消息数是0,因为此时还没有生产者
生产者代码publisher.php

'192.168.75.132', 'vhost' => '/', 'port' => 5672, 'login' => 'test', 'password' => 'test');$cnn = new AMQPConnection($config);if (!$cnn->connect()) { echo "Cannot connect to the broker"; exit();}$ch = new AMQPChannel($cnn);$ex = new AMQPExchange($ch);//消息的路由键,一定要和消费者端一致$routingKey = 'key_1';//交换机名称,一定要和消费者端一致,$exchangeName = 'exchange_1';$ex->setName($exchangeName);$ex->setType(AMQP_EX_TYPE_DIRECT);$ex->setFlags(AMQP_DURABLE);$ex->declareExchange();//创建10个消息for ($i=1;$i<=10;$i++){ //消息内容 $msg = array( 'data' => 'message_'.$i, 'hello' => 'world', ); //发送消息到交换机,并返回发送结果 //delivery_mode:2声明消息持久,持久的队列+持久的消息在RabbitMQ重启后才不会丢失 echo "Send Message:".$ex->publish(json_encode($msg), $routingKey, AMQP_NOPARAM, array('delivery_mode' => 2))."\n"; //代码执行完毕后进程会自动退出}

以上是生产者代码

在执行之前,先关掉前面的两个消费者,打开一个命令行/终端,输入php publisher.php,由于生产者不需要阻塞,执行完进程便退出,所以现在RabbitMQ管理界面中既没有Connections也没有Channels,但是Queues已经被Exchanges投递过去了10条消息,如下图:
20190115230019.png
因为我们执行生产者之前已经关掉了全部消费者,所以此时消息在队列中等待获取;
因为在发送消息时设置了delivery_mode:2来声明消息持久化,此时如果重启RabbitMQ,消息还会恢复;此时重新执行消费者,假设还是两个,打开两个命令行/终端,输入php consumer.php,我们可以看到消息被消费,如下图:
20190115231026.png
提醒:生产者在生产消息时,如果不存在指定队列,并且没有创建队列,或者队列存在但消息路由键和交换机与队列绑定的键(路由规则)不一致(直连交换机必须一致),则消息会被交换机丢弃。
原文地址:

转载于:https://www.cnblogs.com/kingzjm/p/10373769.html

你可能感兴趣的文章
第二章 生成、打包、部署和管理应用程序及类型 2.2 将类型生成模块
查看>>
SqlCommand类
查看>>
python stat模块详解(!important)
查看>>
查询用户下有数据的表
查看>>
Android 系统联系人相关URI
查看>>
linux下的文件结构
查看>>
配置---Eclipse安装JBPM4.4插件
查看>>
AtCoder Regular Contest 094 D Worst Case【思维题】
查看>>
IntelJ 快捷键
查看>>
TCP常见的定时器及三次握手与四次挥手
查看>>
linux下web压力测试工具ab使用及详解
查看>>
[转载]Mvvm Light Toolkit for wpf/silverlight系列
查看>>
Linxu指令--crond
查看>>
Web Worker
查看>>
查看关系表
查看>>
2.4.3 日期/时间选择器
查看>>
思科路由器基本配置
查看>>
LoadRunner使用入门 进行Webservice负载測试
查看>>
Cflow使用具体解释
查看>>
同源策略:
查看>>