https://kr.github.io/beanstalkd/

安装

centos

yum install beanstalkd --enablerepo=epel

源码

githubphp消息队列php应用beanstalkd队列 Java

tar -zxvf /usr/bin/beanstalkd/beanstalkd-1.10.tar.gz​cd beanstalkd​make install PERFIX=/usr/bin/beanstalkd

参考:http://kr.github.io/beanstalkd/download.html

启动

/usr/bin/beanstalkd -l 0.0.0.0 -p 11300 -b /var/lib/beanstalkd/binlog -F &

beanstalkd参数

/usr/bin/beanstalkd -hUse: /usr/bin/beanstalkd [OPTIONS]​Options:-b 开启binlog,断电后重启会自动规复任务。
-f MS fsync最多每MS毫秒-F从不fsync(默认)-l ADDR侦听地址(默认为0.0.0.0)-p端口侦听端口(默认为11300)-u USER成为用户和组-z BYTES设置最大作业大小(以字节为单位)(默认值为65535)-s BYTES设置每个wal文件的大小(默认为10485760) (将被舍入到512字节的倍数)-c压缩binlog(默认)- n 没关系缩binlog-v显示版本信息-V增加冗长度-h显示这个帮助配置文件

/etc/sysconfig/beanstalkd观点

核心观点

job:一个须要异步处理的任务,是 Beanstalkd 中的基本单元,须要放在一个 tube 中。
tube:一个有名的任务行列步队,用来存储统一类型的 job,是 producer 和 consumer 操作的工具。
producer:Job 的生产者,通过 put 命令来将一个 job 放到一个 tube 中。
consumer:Job的消费者,通过 reserve/release/bury/delete 命令来获取 job 或改变 job 的状态。

job 的生命周期

当producer直接put一个job时,job就处于READY状态,等待consumer来处理,如果选择延迟put,job就先到DELAYED状态,等待韶光过后才迁移到READY状态。
consumer获取了当前READY的job后,该job的状态就迁移到RESERVED,这样其他的consumer就不能再操作该job。
当consumer完成该job后,可以选择delete, release或者bury操作;delete之后,job从系统消亡,之后不能再获取;release操作可以重新把该job状态迁移回READY(也可以延迟该状态迁移操作),使其他的consumer可以连续获取和实行该job;故意思的是bury操作,可以把该job休眠,等到须要的时候,再将休眠的job kick回READY状态,也可以delete BURIED状态的job。
正是有这些有趣的操作和状态,才可以基于此做出很多意思的运用,比如要实现一个循环行列步队,就可以将RESERVED状态的job休眠掉,等没有READY状态的job时再将BURIED状态的job一次性kick回READY状态。

READY - 须要立即处理的任务,当延时 (DELAYED) 任务到期后会自动成为当前任务;DELAYED - 延迟实行的任务, 当消费者处理任务后, 可以用将再次放回 DELAYED 行列步队延迟实行;RESERVED - 已经被消费者获取, 正在实行的任务。
Beanstalkd 卖力检讨任务是否在 TTR(time-to-run) 内完成;BURIED - 保留的任务: 任务不会被实行,也不会消逝,除非有人把它 \公众踢\"大众 回行列步队;DELETED - 被彻底删除。
Beanstalkd 不再坚持这些。
特性

优先级

任务 (job) 可以有 0~2^32 个优先级, 0 代表最高优先级,默认优先级为1024。

持久化

可以通过binlog将job及其状态记录到文件里面,在Beanstalkd下次启动时可以通过读取binlog来规复之前的job及状态。

分布式容错

分布式设计和Memcached类似,beanstalkd各个server之间并不知道彼此的存在,都是通过client来实现分布式以及根据tube名称去特定server获取job。

超时掌握

为了防止某个consumer永劫光占用任务但不能处理的情形,Beanstalkd为reserve操作设置了timeout韶光,如果该consumer不能在指定时间内完成job,job将被迁移回READY状态,供其他consumer实行。

客户端

项目地址:https://github.com/pda/pheanstalk/

1、向行列步队中添加job

<?php​//创建行列步队​require_once('./vendor/autoload.php');​use Pheanstalk\Pheanstalk;​$pheanstalk = new Pheanstalk('127.0.0.1',11300);​$tubeName='user_eamil_message_list';​$jobData=array( 'uid' => time(), 'email' => 'wukong@qq.com', 'message' => 'Hello World !!', 'dtime' => date('Y-m-d H:i:s'),);​$pheanstalk ->useTube( $tubeName) ->put( json_encode( $jobData));​echo json_encode($jobData).PHP_EOL;echo 'Success ~~'.PHP_EOL;

2、从行列步队中取出job

<?php

//消费行列步队​require_once('./vendor/autoload.php');​use Pheanstalk\Pheanstalk;​$pheanstalk = new Pheanstalk('127.0.0.1',11300);​$tubeName='user_eamil_message_list';​while(true){ //获取行列步队信息,reserve 壅塞获取 $job = $pheanstalk ->watch($tubeName) ->ignore('default') ->reserve(); $data=$job->getData();​ //实行干系逻辑代码 $ret = file_put_contents('./send_mail.log',$data,FILE_APPEND | LOCK_EX); if( $ret ){ echo '实行成功'.PHP_EOL.$data.PHP_EOL; $pheanstalk->delete($job); }​ //停息(不可能是百分百的准确,跟系统的调度、CPU时钟周期等有一定关系) usleep(500000);}​echo 'Success ~~'.PHP_EOL;

3、检讨做事状态

<?php​//监控做事状态require_once('./vendor/autoload.php');​use Pheanstalk\Pheanstalk;​$pheanstalk = new Pheanstalk('127.0.0.1',11300);$isAlive = $pheanstalk->getConnection()->isServiceListening(); ​var_dump($isAlive);​/​可以开拓监控面板,监控数据的,有多少tube,多少行列步队,多少延迟等等

//查看beanstalkd状态//var_dump($pheanstalk->stats());​//查看有多少个tube//var_dump($pheanstalk->listTubes());​//设置要监听的tube$pheanstalk->watch('test');​//取消对默认tube的监听,可以省略$pheanstalk->ignore('default');​//查看监听的tube列表//var_dump($pheanstalk->listTubesWatched());​//查看test的tube当前的状态//var_dump($pheanstalk->statsTube('test'));​/

测试:

生产:

$ php producer.php {\"大众uid\"大众:1499569740,\"大众email\公众:\公众wukong@qq.com\公众,\"大众message\"大众:\公众Hello World !!\"大众,\公众dtime\公众:\"大众2017-07-09 11:09:00\"大众}Success ~~​php producer.php {\公众uid\"大众:1499569742,\"大众email\"大众:\"大众wukong@qq.com\公众,\"大众message\"大众:\"大众Hello World !!\公众,\"大众dtime\公众:\公众2017-07-09 11:09:02\"大众}Success ~~​ php producer.php {\"大众uid\"大众:1499569744,\"大众email\"大众:\"大众wukong@qq.com\"大众,\"大众message\"大众:\"大众Hello World !!\公众,\"大众dtime\"大众:\"大众2017-07-09 11:09:04\"大众}Success ~~

消费:

$ php consumer.php 实行成功{\"大众uid\公众:1499569740,\公众email\"大众:\"大众wukong@qq.com\"大众,\"大众message\"大众:\公众Hello World !!\"大众,\公众dtime\公众:\"大众2017-07-09 11:09:00\公众}实行成功{\"大众uid\"大众:1499569742,\公众email\"大众:\"大众wukong@qq.com\公众,\"大众message\"大众:\"大众Hello World !!\"大众,\公众dtime\"大众:\"大众2017-07-09 11:09:02\公众}实行成功{\"大众uid\公众:1499569744,\"大众email\公众:\公众wukong@qq.com\"大众,\"大众message\"大众:\公众Hello World !!\"大众,\"大众dtime\公众:\公众2017-07-09 11:09:04\"大众}

PHP监控

https://github.com/kr/beanstalkd/wiki/Tools

https://github.com/ptrofimov/beanstalk_console

https://github.com/jimbojsb/bstools

参考:

https://segmentfault.com/a/1190000002784775

http://www.jianshu.com/p/413676e1696f