耳熟能详的行列步队(事理)

行列步队实在便是一个行列步队构造的中间件,也便是说把和内容放入到一个容器后,就可以直接的返回了,不理会等它后期处理的结果,容器里的内容会有另一个程序按照顺序进行逐个的去处理。

一个行列步队结果是这样的过程:

消息队列使用场景php脚把脚教你新闻队列典范的运用场景到底有哪些 Ruby

由一个业务系统进行入队,把(内容)逐个插入行列步队中,插入成功之后直接返回成功的结果,然后后续有一个处理系统,这个别系会把行列步队中的记录逐个进行取出并且进行处理,进行出队的操作。

行列步队有哪些运用处景

行列步队紧张利用在冗余,解耦,流量削峰,异步通讯,还有一些扩展性,排序担保等,下面我们详细来理解一下这些特性

数据冗余

比如一个订单系统,订单很多的时候,到后续须要严格的转换和记录,这个时候行列步队就可以把这些数据持久化存储在行列步队中,然后由订单处理程序进行获取,后续处理完成之后再把这条记录删除,担保每条记录都能处理完成。

系统解耦

行列步队分离了两套系统:入队系统和出队系统,办理了两套系统深度耦合的问题。
利用行列步队后,入队的系统和出队的系统是没有直接的关系的,入队系统和出队系统个中一套系统崩溃的时候,都不会影响到另一个别系的正常运转。

我们用一个别系解耦的案例来详细讲解一下:行列步队处理订单系统和配送系统

场景:在网购的时候提交订单之后,看到自己的订单货色在配送中,这样就参与进来一个别系是配送系统,如果我们在做架构的时候,把订单系统和配送系统设计到一起,就会涌现问题。
首先对付订单系统来说,订单系统处理压力较大,对付配送系统来说没必要对这些压力做及时反响,我们没必要在订单系统涌现问题的情形下,同时配送系统涌现问题,这时候就会同时影响两个别系的运转,以是我们可以用解耦来办理。

这两个别系分开之后,我们可以通过一个行列步队表来实现两个别系的沟通。
首先,订单系统会吸收用户的订单,进行订单的处理,会把这些订单写到行列步队表中,这个行列步队表是沟通两个别系的关键,由配送系统中的定时实行的程序来读取行列步队表进行处理,配送系统处理之后,会把已经处理的记录进行标记,这便是全体详细流程。

详细细节设计如下(Mysql行列步队举例):

首先,我们用order.php的文件吸收用户的订单。
然后天生订单号并对订单进行处理,订单系统处理完成之后会把配送系统须要的数据增加到行列步队表中。

订单表

CREATE TABLE `order_queue` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '订单的id号', `order_id` int(11) NOT NULL, `mobile` varchar(20) NOT NULL COMMENT '用户的手机号', `address` varchar(100) NOT NULL COMMENT '用户的地址', `created_at` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '订单创建的韶光', `updated_at` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '物流系统处理完成的韶光', `status` tinyint(2) NOT NULL COMMENT '当前状态,0 未处理,1 已处理,2处理中', PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;

然后有一个定时脚本,每分钟启动配送处理程序,配送处理程序:goods.php用来处理行列步队表中的数据,当处理完成之后,会把行列步队表中的字段状态改为处理完成,这样就结束了全体流程。
详细代码如下: 1 处理订单的order.php文件

<?phpinclude 'class/db.php'; if(!empty($_GET['mobile'])){ $order_id = rand(10000,99999).date("YmdHis").'688'; $insert_data = array( 'order_id'=>$order_id, 'mobile'=>$_GET['mobile'], //记得过滤 'created_at'=>date('Y-m-d H:i:s',time()), 'order_id'=>$order_id, 'status'=>0, //0,未处理状态 ); $db = DB::getIntance();//把数据放入行列步队表中 $res = $db->insert('order_queue',$insert_data); if($res){ echo $insert_data['order_id']."保存成功"; }else{ echo "保存失落败"; }}else{ echo "1";}?>

配送系统处理订单的文件goods.php

<?php//配送系统处理订单并进行标记include 'class/db.php';$db = DB::getIntance();//1:先要把要处理的数据状态改为待处理$waiting = array('status'=>0,);$lock = array('status'=>2,);$res_lock = $db->update('order_queue',$lock,$waiting,2);//2:选择出刚刚更新的数据,然后进行配送系统处理if($res_lock){ //选择出要处理的订单内容 $res = $db->selectAll('order_queue',$lock); //然后由配货系统进行处理.....等操作 //3:把处理过的改为已处理状态 $success = array( 'status'=>1, 'updated_at'=>date('Y-m-d H;i:s',time()), ); $res_last = $db->update('order_queue',$success,$lock); if($res_last){ echo "处理成功:".$res_last; }else{ echo "处理失落败:".$res_last; }}else{ echo "全部处理完成";}?>

定时实行脚本的goods.sh,每分钟实行一次

#!/bin/bashdate "+%G-%m-%d %H:%M:%S" //当前年月日cd /data/wwwroot/default/mq/php goods.php

然后crontab任务定时实行脚本,并创建日志文件,然后指定输出格式

/1 /data/wwwroot/default/mq/good.sh >> /data/wwwroot/default/mq/log.log 2>&1 //指定脚本目录并格式化输出//当然要创建log.log文件

监控日志

tail -f log.log //监控日志

这样订单系统和配送系统j便是相互独立的咯,并不影响另一个别系的正常运行,这便是系统解耦处理. 流量削峰 这种场景最经典的便是秒杀和抢购,这种情形会涌现很大的流量剧增,大量的需求集中在短短的几秒内,对做事器的瞬间压力非常大,我们合营缓存redis利用行列步队来有效的办理这种瞬间访问量,防止做事器顶不住而崩溃。
我们也用一个案例来理解理解:利用Redis的List类型实现秒杀。
我们会用到redis的这些函数: RPUSH/RPUSHX:将值插入到链表的尾部。
同上,位置相反 LPOP:移除并获取链表中的第一个元素。
RPOP:移除并获取链表中末了一个元素。
LTRIM:保留指定区间内的元素。
LLEN:获取链表的长度。
LSET:用索引设置链表元素的值。
LINDEX:通过索引获取链表中的元素。
LRANGE:获取链表指定例模内的元素。
场景 记录哪个用户参与了秒杀,同时记录韶光,这样方便后续处理,用户的ID会存储到【Redis】的链表里进行排队,比如打算让前10个人秒杀成功,后面的人秒杀失落败,这样让redis链表的长度保持为10就可以了,10个往后如果再到redis要求追加数据,那么程序上谢绝要求,在redis存取之后,后面的程序会对redis进行取值,由于数据不能长久放在缓存,后面有一个程序遍历处理redis的值,放入数据库永久保存,由于秒杀本来不会太长,可以用脚本循环扫描。
详细解释: 首先Redis程序会把用户的要求数据放入redis,紧张是uid和微秒韶光戳;然后检讨redis链表的长度,超出长度就放弃处理;去世循环数据读取redis链表的内容,入库。
秒杀记录表设计:

CREATE TABLE `redis_queue` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `uid` int(11) NOT NULL DEFAULT '0', `time_stamp` varchar(24) NOT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

吸收用户要求的程序:

<?php$redis = new Redis();$redis->connect('127.0.0.1',6379);$redis-_name = 'miaosha';//秒杀用户涌入仿照,500个用户for ($i =0; $i < 500; $i++) { $uid = rand(1000000,99999999);}//检讨redis链表长度(已存在数量)$num = 10;if ($redis->lLen($redis_name) < 10 ) { //加入链表尾部 $redis->rPush($redis_name, $uid.'%'.microtime());} else { //如果达到10个 //秒杀结束}$redis->close();

处理程序(拿到redis数据写入数据表里)

<?php//从行列步队头部读一个值,判断这个值是否存在,如果存在则切割出韶光、uid保存到数据库中。
(对付redis而言,如果从redis取出这个值,那么这个值就不在redis行列步队里了,如果涌现问题失落败了,那么我们须要有一个机制把失落败的数据重新放入redis链表中)$redis = new Redis();$redis->connect('127.0.0.1',6379);$redis-_name = 'miaosha';//去世循环检测redis行列步队while(1) { $user = $redis->lpop($redis_name); if (!$user || $user == 'null') { //如果没有数据跳出循环 //如果一贯实行,速率是非常快的,那么做事器压力大,这里2秒一次 sleep(2); //跳出循环 continue; } //拿出微秒韶光戳和uid $user_arr = explode('%', $user); $insert_data = array( 'uid' => $user_arr[0]; 'time_stamp' => $user_arr[1]; ); $res = $db->insert('redis_queue', $insert_data); //如果插入失落败 if (!$res) { //从哪个方向取出,从哪个方向插回 $redis->lpush($redis_name, $user); sleep(2); }}$redis->close();

测试的话,可以先实行循环检测脚本,然后再实行秒杀脚本开始测试,监测Mysql数据库的变革。
异步通讯 本身可以使入队的系统直接返回,以是实现了程序的异步操作,因此只要适宜于异步的场景都可以利用行列步队来实现。
下面来看详细案例: 基本知识点 重点用到了以下命令实现我们的推送。

brpop 壅塞模式 从行列步队右边获取值之后删除brpoplpush 从行列步队A的右边取值之后删除,从左侧放置到行列步队B中

逻辑剖析

在普通的任务脚本中写入push_queue行列步队要发送的目标,并为目标设置一个要推送的内容,永不过期RedisPushQueue中brpoplpush处理,处理后的值放到temp_queue,紧张防止程序崩溃造成推送失落败

RedisAutoDeleteTempqueueItems处理temp_queue,这里用到了brpop

代码:普通任务脚本

<?phpforeach ($user_list as $item) { //命名规则 业务类型_操作_ID_随机6位 值 自定义 我自定义的是"推送内容" $k_name = 'rabbit_push_' . $item['uid'].'_'.rand(100000,999999); $redis->lPush('push_queue',$k_name);//左进行列步队 $redis->set($k_name, '推送内容');}

RedisPushQueue

<?php//行列步队处理推送~// // 守护进程运行 // nohup php YOURPATH/RedisPushQueue.php & 开启守护进程运行,修正文件之后须要重新启动// blpop 有值则回去 没值则壅塞 紧张便是这个函数在起浸染 不过并不屈安,程序在实行过程中崩溃就会导致行列步队中的内容 // 永久丢失~ // BRPOPLPUSH 壅塞模式 右边出 左边进 在填写行列步队内容的时候哀求从左进入 //ini_set('default_socket_timeout', -1); //不超时require_once 'YOURPARH/Rongcloud.php'; $redis = new Redis();$redis->connect('127.0.0.1', 6379);$redis->select(2);//切换到db2$redis->setOption(Redis::OPT_READ_TIMEOUT, -1); // temp_queue临时行列步队防止程序崩溃导致行列步队中内容丢失 0代表永不超时!
While ($key = $redis->brpoplpush('push_queue', 'temp_queue', 0)) { if ($val = $redis->get($key)) { //rabbit_push_20_175990 $arr = explode('_', $key); if (count($arr) != 4) { continue; } $id = $arr[2]; push($id, $val); //删除key内容 $redis->del($key); }}function push($id, $v){ //推送操作~}

RedisAutoDeleteTempqueueItems 自动处理temp_queue中的元素,这个操作是防止RedisPushQueue崩溃的时候做处理。
处理思路是 利用brpop 命令壅塞处理temp_queue这个行列步队中的值,如果能获取到"值"对应的"值",解释RedisPushQueue实行失落败了,将值还lpush到push_queue中,以备重新处理 至于为什么利用brpop命令,是由于在RedisPushQueue中我们利用的是brpoplpushnohup php YOURPATH/RedisAutoDeleteTempqueueItems.php & 开启守护进程运行,修正文件之后须要重新启动

<?phpini_set('default_socket_timeout', -1); //不超时$redis = new Redis();$redis->connect('127.0.0.1', 6379);$redis->select(2);//切换到db2$redis->setOption(Redis::OPT_READ_TIMEOUT, -1); while($key_arr = $redis->brPop('temp_queue',0)){ if(count($key_arr) != 2){ continue; } $key =$key_arr[1]; if($redis->get($key)){//能获取到值 解释RedisPushQueue实行失落败 $redis->lPush('push_queue',$key); }}

更专业的行列步队,你可以利用:RabbitMQ,ActiveMq,ZeroMq,Kafka,这里就不过多的去先容这些了。

系统的学习PHP

领取办法:点赞关注

关注本专栏:PHP进阶集中营,转发文章,领取以下视频教程1、php基于tp5.1开拓微信"大众年夜众号2、Restful Api实战演习训练 3、实例学习PHP QRCode天生二维码4、2小时教你轻松搞定支付宝、微信扫码支付5 、 ThinkPHP6.0极速入门

你将收成:

理解当下最火热的微做事架构事理及其开源框架;触及swoole并发、性能优化、分布式等系列PHP高等技能 对照自己节制知识点进行查漏补缺,帮助肃清知识盲区、重构知识体系。

末了,大家如果以为本文不错就点个赞吧~!
“点关注,不迷路”,每天带你分享不一样的PHP技能资讯。

针对知识体系我总结出了互联网公司PHP程序员口试涉及到的绝大部分口试题及答案做成了文档和架构视频资料免费分享给大家(包括swoole、Redis、laravel、thinkphp、swoft、docker、分布式、高并发等架构技能资料),希望能帮助到您口试前的复习且找到一个好的事情,也节省大家在网上搜索资料的韶光来学习。

领取办法:点赞关注小编后私信【资料】获取资料领取办法!