Docker + Redis + Beanstalkd + Swoole构建健壮的队列
使用到的技术有docker + redis + beanstalkd + swoole
1 2 3 4 5 6 7 8 9 10 11 12 13
| docker pull redis:5.0.7 docker pull schickling/beanstalkd
docker images
docker run --name beanstalkd -d -it -p 11300:11300 428 docker run --name redis01 -d -it -p 6380:6379 redis:5.0.7 d: cd D:\phpstudy_pro\WWW\test\beanstalkd
composer require pda/pheanstalk:3.1 composer require predis/predis:1.1
|
|
性能 |
可靠性(ack应答) |
可扩展性 |
| kafka |
8w/s |
不可靠 |
集群 |
| rabitMQ |
4w/s |
可靠 |
集群 |
| redis |
8w/s |
不可靠 |
集群 |
| beanstalk |
8w/s |
可靠 |
手动构建 |
beanstalkd 是一个高性能、轻量级的内存队列系统
beanstalkd特性
支持优先级(支持队伍插队)
延迟(实现定时任务)
持久化(定时把内存中的数据刷到binlog日志)
预留(把任务设置成预留,消费者无法取出任务,等某个合适时机再拿出来处理)
任务超时重发(消费者必须在指定时间内处理任务,如果没有则认为任务失败重新进入队列)
beanstalkd核心元素
生产者->管道(tube)->任务(job)->消费者
job: 一个需要异步处理的任务,需要放在一个tube中
tube: 一个有名字的任务队列,用来存储统一类型的job,可以创建多个管道
producer: job的生产者
consumer:job的消费者
流程:由producer产生一个任务job,并将任务job推进到一个tube中,然后由consumer从tube中取出job执行
composer.json
1 2 3 4
| "require":{ "pda/pheanstalk":"^3.1", "predis/predis": "^1.1", }
|
生产者产生任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| <?php
require "vendor/autoload.php";
try{ $p = new \Pheanstalk\Pheanstalk('127.0.0.1',11300); swoole_timer_tick(10,function() use ($p)){ $data = [ 'msg_id' => session_create_id(),//php7.1新出的生成随机的id 'tid'=>time().uniqid(), ]; $id = $p->putInTube('task',json_encode($data)); var_dump($id); }
}catch(Exception $e){
}
|
消费者执行任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| <?php
require "vendor/autoload.php";
$workerNum = 5; $pool = new Swoole\Process\Pool($workerNum);
$pool->on('WorderStart',function($pool,$workerId){ echo 'Worder#'.$workderId.'is started'; try{ $p = new \Pheanstalk\Pheanstalk('127.0.0.1',11300); $redis = new Predis\Client('tcp://127.0.0.1:6380'); while(true){ $job = $p->watch('task')->reserve(); if(!$empty($job)){ $json = $job->getData(); $data = json_decode($json,true); $state = $redis->get('job:'.$data["msg_id"]); if($state == 1){ $p->release($job,0,5); }elseif($state == 2){ continue; }else{ $redis->setex('job:'.$data['msg_id'],6,1); sleep(5); $redis->set('job:'.$data['msg_id'],2); $p->delete($job); } } } }catch(Exception $e){
} }); $pool->on('WorderStop',function($pool,$workerId){ echo 'Worder#'.$workderId.'is stopped'; });
|