基于Beanstalkd的消息队列实践

  1. beanstalk
  2. Beanstalkd特点
  3. 性能
  4. 安装
  5. client队列投放
  6. worker:处理出队列的数据
  7. result
  8. 参考文章

beanstalk

beanstalk中文魔豆,是一个轻量级消息中间件,他的最大特点是将自己定位为基于管道 (tube) 和任务 (job) 的工作队列 (work-queue):

enter image description here

Beanstalkd设计里面的核心概念:

####job
一个需要异步处理的任务,是Beanstalkd中的基本单元,需要放在一个tube中。

####tube
一个有名的任务队列,用来存储统一类型的job,是producer和consumer操作的对象。

####producer
Job的生产者,通过put命令来将一个job放到一个tube中。

####consumer
Job的消费者,通过reserve/release/bury/delete命令来获取job或改变job的状态。

Beanstalkd特点

####任务优先级 (priority):
任务 (job) 可以有 0~2^32 个优先级, 0 代表最高优先级。 beanstalkd 采用最大最小堆 (Min-max heap) 处理任务优先级排序, 任何时刻调用 reserve 命令的消费者总是能拿到当前优先级最高的任务, 时间复杂度为 O(logn).

####延时任务 (delay):
有两种方式可以延时执行任务 (job): 生产者发布任务时指定延时;或者当任务处理完毕后, 消费者再次将任务放入队列延时执行 (RELEASE with )。这种机制可以实现分布式的 java.util.Timer,这种分布式定时任务的优势是:如果某个消费者节点故障,任务超时重发 (time-to-run) 能够保证任务转移到另外的节点执行。

####任务超时重发 (time-to-run):
Beanstalkd 把任务返回给消费者以后:消费者必须在预设的 TTR (time-to-run) 时间内发送 delete / release/ bury 改变任务状态;否则 Beanstalkd 会认为消息处理失败,然后把任务交给另外的消费者节点执行。如果消费者预计在 TTR (time-to-run) 时间内无法完成任务, 也可以发送 touch 命令, 它的作用是让 Beanstalkd 从系统时间重新计算 TTR (time-to-run).

####任务预留 (buried):
如果任务因为某些原因无法执行, 消费者可以把任务置为 buried 状态让 Beanstalkd 保留这些任务。管理员可以通过 peek buried 命令查询被保留的任务,并且进行人工干预。简单的, kick 能够一次性把 n 条被保留的任务踢回队列。

####Beanstalkd协议中文版
本博也在之前将师兄翻译的beanstalkd中文协议搬了过来做备用,现在涉及的队列方面的东西越多,就觉得这些资料着实重要。
http://www.fzb.me/2015/03/21/beanstalkd-protocol-chinese-version/

性能

参考数据:

入队列 worker处理
delayed job 200 jobs/sec 120 jobs/sec
resque 3800 jobs/sec 300 jobs/sec
rabbitmq 2500 jobs/sec 1300 jobs/sec
beanstalk 9000 jobs/sec 5200 jobs/sec

安装

源码下载:https://github.com/kr/beanstalkd/archive/v1.10.tar.gz

启动参数:

➜  ~  beanstalkd --help
Options:
 -b DIR   wal directory
 -f MS    fsync at most once every MS milliseconds (use -f0 for "always fsync")
 -F       never fsync (default)
 -l ADDR  listen on address (default is 0.0.0.0)
 -p PORT  listen on port (default is 11300)
 -u USER  become user and group
 -z BYTES set the maximum job size in bytes (default is 65535)
 -s BYTES set the size of each wal file (default is 10485760)
            (will be rounded up to a multiple of 512 bytes)
 -c       compact the binlog (default)
 -n       do not compact the binlog
 -v       show version information
 -V       increase verbosity
 -h       show this help

➜  ~  beanstalkd -l 0.0.0.0 -p 11300 -f 300

client队列投放

use Pheanstalk\Pheanstalk;
// 投放短信发送定时器

$beanstalk = new Pheanstalk('127.0.0.1', '11300', '2');

$data = array(
'mobile' => '13051662435',
'id' => '11',
'time' => '2015-05-01 10:30:00'
);
$delay = (int) strtotime($data['time']) - time();

$beanstalkd->useTube('Sms')
->put(json_encode($data), 1024, $delay, 2);

worker:处理出队列的数据

class Worker {

private $path;

public function __construct($path) {
$this->setBasePath($path);
$this->log('starting');
$this->pheanstalk = new Pheanstalk('127.0.0.1:11300');
}

public function __destruct() {
$this->log('ending');
}

private function setBasePath($path) {
$this->path = $path;
}

public function run() {
$this->log('starting to run');
$cnt = 0;
$done_jobs = array();

while(1) {
$job = $this->pheanstalk->watch('test')->ignore('default')->reserve();
$job_encoded = json_decode($job->getData(), false);
$done_jobs[] = $job_encoded;
$this->log('job:'.print_r($job_encoded, 1));
$this->pheanstalk->delete($job);
$cnt++;

$memory = memory_get_usage();

$this->log('memory:' . $memory);

if($memory > 1000000) {
$this->log('exiting run due to memory limit');
exit;
}

usleep(10);
}
}

private function log($txt) {
file_put_contents($this->path . '/log/worker.txt', $txt . "\n", FILE_APPEND);
}
}

Picking up things from the queue
$worker = new Worker(dirname($argv[0]));
$worker->run();

result

{"mobile":"13051662435","id":"11","time":"2015-05-01 10:30:00"}

具体实践图示:
enter image description here

PS:
1)可使用supervisor或deamontools等将php worker.php变为守护进程.
2)其它语言类库参考:https://github.com/kr/beanstalkd/wiki/client-libraries
3)根据不同业务需求,还可以在失败时设定reverse的时间做成轮询的方式。

参考文章

http://ccvita.com/395.html
http://adam.herokuapp.com/past/2010/4/24/beanstalk_a_simple_and_fast_queueing_backend/
http://my.oschina.net/u/698121/blog/157092
http://in355hz.iteye.com/blog/1395727

script>