(国际带宽出口不稳定,可能会下载失落败,重试记次就好了)

docker pull webdevops/php-nginx:centos-7-php56 //下载镜像docker run -d -p 80:80 --name rabbitmq webdevops/php-nginx:centos-7-php56 //运行容器docker exec -ti rabbitmq /bin/bash //进入容器

进入到容器后检测下环境是否有相应扩展

cd appvi index.php

刚刚我们在运行容器的时候利用80端口,在浏览器中输入http://ip

phpconsume在PHP中若何应用RabbitMQ来实现新闻的订阅和宣布 GraphQL

搜索下没有amqp干系的信息。
下面开始安装amqp扩展。

yum install gcc librabbitmq-devel.x86_64 php56w-devel-ywget http://pecl.php.net/get/amqp-1.4.0.tgztar -zxvf amqp-1.4.0.tgzcd amqp-1.4.0phpize./configure --with-amqpmake && make install

在php.ini中开启extension=amqp.so 接着重启php-fpm 或 Web做事器

vi /etc/php.iniextension=amqp.so

我这里就直接重启容器了,如果是宿主机直接安装php环境直接重启环境。

exit //退出容器docker restart rabbitmq //重启容器

再查看phpinfo,amqp扩展已经安装好了:

publish发布

在/app路径下新建一个publish.php的文件

touch publish.phpvi publish.php

以下是PHP代码,我们先定义好用来发的交流机、行列步队、RoutingKey、等变量。

$queueName = 'superrd';$ExchangeName = 'superrd';$routeKey = 'superrd';$message = 'Hello World!';

按照我们第二章讲到的首先建立一个连接。

$connection = new AMQPConnection(array('host' => '10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd'));$connection->connect() or die(\"大众Cannot connect to the broker!\n\公众);

新建一个信道。

$channel = new AMQPChannel($connection);

新建一个交流机Exchange,并定义属性,第二章我们讲过有四种类型的交流机,这里利用直连型DIRECT。
AMQP_DURABLE代表这是一个持久化的交流机,不会以为做事器非常等成分丢失。

$exchange = new AMQPExchange($channel);$exchange->setName($exchangeName);$exchange->setType(AMQP_EX_TYPE_DIRECT);$exchange->setFlags(AMQP_DURABLE);$exchange->declareExchange();

新建一个行列步队Queue,前面也讲过生产者将发送到Exchange中,Exchange会根据绑定关系投递到行列步队,也便是如果生产者在生产时没有行列步队与之绑定就会丢失。
为了担保系统更加健硕,一样平常无论是的生产者还是消费者都会新建一遍Exchange和Queue,新建后属性不会改变。
同样AMQP_DURABLE代表这是一个持久化的行列步队,行列步队会被写入磁盘。
须要把稳的是虽然是缓存在行列步队中,但是并不是行列步队是持久化的队列队列中的便是持久化的,的持久化须要单独设置。

$queue = new AMQPQueue($channel);$queue->setName($queueName);$queue->setFlags(AMQP_DURABLE);$queue->declareQueue();

通过routeKey绑定交流机和行列步队。

$queue->bind($exchangeName, $routeKey);

好了,下面可以发送了

$exchange->publish($message,$routeKey);

如果你希望也是持久化的可以利用如下的代码,实际测试结果在持久化后发布的性能低落一倍,我的磁盘是pcie的固态硬盘,如果你是机器磁盘这个性能低落估计会更明显,24核心CPU,48GB内存,pcie固态硬盘,单线程的情形下每秒可以发布2.5万旁边的非持久化,持久化之后变为变为1.2万旁边。

$exchange->publish($message,$routeKey,AMQP_NOPARAM, array('delivery_mode'=>2));

断开连接。

$connection->disconnect();

同样在发布之后可以通过WEB工具来查看是否发布成功,

查看交流机多了一个superid交流机。

查看交流机已经有superrd行列步队。

点击行列步队查看行列步队详情。
Bindings标签可以看到交流机和行列步队的绑定关系。

点击Get messages标签Get message(s)按钮可以看到行列步队中的。

到此解释我们已经将一个发布到了行列步队中。
完全的PHP代码如下。

'10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd'));$connection->connect() or die(\"大众Cannot connect to the broker!\n\"大众);try {$channel = new AMQPChannel($connection);$exchange = new AMQPExchange($channel);$exchange->setName($exchangeName);$exchange->setType(AMQP_EX_TYPE_DIRECT);$exchange->setFlags(AMQP_DURABLE);$exchange->declareExchange();$queue = new AMQPQueue($channel);$queue->setName($queueName);$queue->setFlags(AMQP_DURABLE);$queue->declareQueue();$queue->bind($exchangeName, $routeKey);$exchange->publish($message,$routeKey);var_dump(\"大众[x] Sent 'Hello World!'\"大众);} catch (AMQPConnectionException $e) {var_dump($e);exit();}$connection->disconnect();Subscribe订阅

在/app路径下新建一个subscribe.php的文件

touch subscribe.phpvi subscribe.php

以下是PHP代码,和发布一样我们先定义好用交流机、行列步队、RoutingKey等变量。

$queueName = 'superrd';$exchangeName = 'superrd';$routeKey = 'superrd';

按照我们第二章讲到的首先建立一个连接。

$connection = new AMQPConnection(array('host' => '10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd'));$connection->connect() or die(\公众Cannot connect to the broker!\n\"大众);

新建一个信道。

$channel = new AMQPChannel($connection);

与发布一样新建交流机。

$exchange = new AMQPExchange($channel);$exchange->setName($exchangeName);$exchange->setType(AMQP_EX_TYPE_DIRECT);$exchange->setFlags(AMQP_DURABLE);$exchange->declareExchange();

新建一个行列步队Queue。

$queue = new AMQPQueue($channel);$queue->setName($queueName);$queue->setFlags(AMQP_DURABLE);$queue->declareQueue();

通过routeKey绑定交流机和行列步队。

$queue->bind($exchangeName, $routeKey);

重点来了,壅塞订阅。

//壅塞模式吸收echo \"大众Message:\n\"大众;while(True){$queue->consume('processMessage');//自动ACK应答//$queue->consume('processMessage', AMQP_AUTOACK);}$conn->disconnect();/ 消费回调函数 处理/function processMessage($envelope, $q) {$msg = $envelope->getBody();echo $msg.\公众\n\"大众; //处理$q->ack($envelope->getDeliveryTag()); //手动发送ACK应答}

把稳由于是壅塞监听,由于输出缓冲区的缘故原由用浏览器访问该文件是看不到输出的。
利用脚本访问。

php /app/subscribe.php

通过WEB工具查看行列步队。
superrd行列步队中的数已经为0。

完全的PHP代码如下。

'10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd'));$connection->connect() or die(\"大众Cannot connect to the broker!\n\公众);$channel = new AMQPChannel($connection);$exchange = new AMQPExchange($channel);$exchange->setName($exchangeName);$exchange->setType(AMQP_EX_TYPE_DIRECT);$exchange->setFlags(AMQP_DURABLE);$exchange->declareExchange();$queue = new AMQPQueue($channel);$queue->setName($queueName);$queue->setFlags(AMQP_DURABLE);$queue->declareQueue();$queue->bind($exchangeName, $routeKey);//壅塞模式吸收echo \"大众Message:\n\公众;while(True){$queue->consume('processMessage');//自动ACK应答//$queue->consume('processMessage', AMQP_AUTOACK);}$conn->disconnect();/ 消费回调函数 处理/function processMessage($envelope, $q) {$msg = $envelope->getBody();echo $msg.\公众\n\公众; //处理$q->ack($envelope->getDeliveryTag()); //手动发送ACK应答}