Beanstalkd 的一种分布式方案

  1. 分布式实现
  2. IN ACTION
  3. DEMO
  4. 参考

本文特指的是Beanstalkd与分布式结合的一种方案。

在Beanstalkd的文档中,其实是支持分布式的,其设计思想和Memcached类似,Beanstalkd各个server之间并不知道彼此的存在,是通过client实现分布式以及根据tube名称去特定server获取job。

做成分布式服务时,客户端对于Beanstalkd服务来说,只管做好消费者的消息投放工作,而对于处理该服务的各个服务器,只需要各自消耗自己待取出的消息数据即可。那么相对应的,其实就是需要一个pool的东西,将消息从客户端到服务器的投放过程隐藏起来,达到资源的合理分配。

封装的部分,有两个通道组成,一个是消息的入口通道,主要指的是消费者的投放操作。客户端从已经配置好的Beanstalkd服务中使用自己关注的管道(tube),并针对业务投放有一定延时或非延时、指定优先级等消息。

另外一个自然是出口通道,消费者从配置文件中获取到所有Beanstalkd服务器,这些服务器中监听自己所关注的管道(tube),将符合条件的消息处理后删除,将不符合条件的消息继续投放到待取出状态,完成轮询的思想。

而封装本身,可以通过PHP扩展来完成。此处提到的就是:https://github.com/nil-zhang/php-beanstalk

分布式实现

该方案的分布式算法借助于两个Beanstalkd的客户端类库,分别是:

php-beanstalk中的分布式算法策略主要基于一致性算法(Consistent Hashing),文末的参考中有提到,在此跟一下代码。

server pool

/* server pool */
struct bsc_pool {
bsc **servers;
int num_servers;
bsc_hash_strategy_t *hash; /* hash strategy */
void *hash_state; /* strategy specific state */
struct timeval timeout; /* smallest timeout for any of the servers */
bsc_failure_callback failure_callback; /* receives notification when a server fails */
void *failure_callback_param;
};

hash生成

typedef struct bsc_hash_strategy {
bsc_hash_create_state create_state;
bsc_hash_free_state free_state;
bsc_hash_find_server find_server;
bsc_hash_add_server add_server;
} bsc_hash_strategy_t;

bsc_hash_create_state

typedef struct bsc_hash_function {
bsc_hash_function_init init;
bsc_hash_function_combine combine;
bsc_hash_function_finish finish;
} bsc_hash_function_t;

IN ACTION

  1. 安装libbeanstalkclient
$mkdir php-beanstalk
$cd php-beanstalk/
$wget https://github.com/bergundy/libbeanstalkclient/tarball/master --no-check-certificate
$mv master bergundy-libbeanstalkclient-b6ec294.tar.gz
$tar -xzvf bergundy-libbeanstalkclient-b6ec294.tar.gz
$mv bergundy-libbeanstalkclient-b6ec294 libbeanstalkclient
$cd libbeanstalkclient/
$mkdir m4
$sudo ./autogen.sh
  1. 安装php-beanstalk
$wget https://github.com/nil-zhang/php-beanstalk/tarball/master --no-check-certificate
$mv master php-beanstalk.tar.gz
$tar -xzvf php-beanstalk.tar.gz
$cd php-beanstalk
$phpize
$ ./configure
$make
$sudo make install
$sudo vim /etc/php.d/beanstalk.ini
extension = "beanstalk.so"

DEMO

下面部署两台beantalkd服务器,部署与配置的过程再不赘述,应用程序要保存键名为tube0~tube9的数据,并为之投放与管道名一致为键名(当然,只要不超过消息的长度,你放json也是没有任何问题的),value0~9为键值的数据。

我们利用telnet的方式对两台部署有beanstalkd的服务器进行监控。
脚本文件:

$bsc = new Beanstalk();

$bsc->addServer('192.168.33.10', '11300');
$bsc->addServer('192.168.33.1', '11300');

$tubes = $bsc->list_tubes();
var_dump($tubes);

for($i = 0; $i < 10; $i++)
{
$tube = "tube" . $i;
$key = "key".$i;
$value = "value".$i;

$bsc->use($tube);
$bsc->put($key, $value);
echo "$key\t$value\n";
}

echo "done\n";

扩展返回了以服务器为键名的所有管道(list-tubes):

array(20) {
["srv0_0"]=>
string(7) "default"
["srv0_1"]=>
string(4) "key1"
["srv0_2"]=>
string(4) "key3"
["srv0_3"]=>
string(4) "key4"
["srv0_4"]=>
string(4) "key6"
["srv0_5"]=>
string(4) "key8"
["srv0_6"]=>
string(5) "tube1"
["srv0_7"]=>
string(5) "tube3"
["srv0_8"]=>
string(5) "tube4"
["srv0_9"]=>
string(5) "tube6"
["srv1_0"]=>
string(7) "default"
["srv1_1"]=>
string(4) "key0"
["srv1_2"]=>
string(4) "key2"
["srv1_3"]=>
string(4) "key5"
["srv1_4"]=>
string(4) "key7"
["srv1_5"]=>
string(4) "key9"
["srv1_6"]=>
string(5) "tube0"
["srv1_7"]=>
string(5) "tube2"
["srv1_8"]=>
string(5) "tube5"
["srv1_9"]=>
string(5) "tube7"
}

192.168.33.10(第一台部署有beanstalkd的服务器)监控:

➜  ~ telnet 127.0.0.1 11300
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
list-tubes
OK 89
---
- default
- key1
- key3
- key4
- key6
- key8
- tube1
- tube3
- tube4
- tube6
- tube8

192.168.33.1(第二台部署有beantalkd的服务器)监控:

➜  ~ telnet 127.0.0.1 11300
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
list-tubes
OK 89
---
- default
- key0
- key2
- key5
- key7
- key9
- tube0
- tube2
- tube5
- tube7
- tube9

可以看到消息数据大致均匀的分布到了两台服务器上。

参考

script>