用Redis实现PHP异步队列

最近项目中用到好多用异步队列处理的后台任务,有些心得,记录一下。

下面引用百度百科的对队列的解释:

队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。
简单点讲就是:先进先出

要实现一个队列,你可以根据自身服务器资源,可选数据库或者redis或者其他更高级的队列资源来实现。

简单实现,大概代码是这样子的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Queue{
protected $items = [];

/**
* 从队列尾部插入数据
* @param string $key 对列名称
*/
public function push($key, $value){
isset($this->items[$key]) or $this->items[$key] = [];
return array_push($this->items[$key], $value);
}

/**
* 从队列头部取出数据
* @param string $key 对列名称
* return mixed $value 数据
*/
public function pop($key){
isset($this->items[$key]) or $this->items[$key] = [];
return array_shift($this->items[$key]);
}
}

测试:

1
2
3
4
5
6
7
8
9
10
$queue = new Queue();

var_dump($queue->pop('list_1')); //输出 NULL

$queue->push('list_1', '1');
$queue->push('list_1', '2');
$queue->push('list_1', '3');

var_dump($queue->pop('list_1')); //输出 string(1) "1"
var_dump($queue->pop('list_1')); //输出 string(1) "2"

上面是一个直接存储在数组中的队列,只能在一个php生命周期中使用。我们可以把存储的方式改成别的,例如数据库或者redis什么的。

这里我们使用redis,用到redis的有序列表(list)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class RedisQueue{
protected $client; //这里使用了predis这个库连接redis
public function __construct(\Predis\Client $client)
{
$this->client = $client;
}

/**
* 从队列尾部插入数据
* @param string $key 对列名称
*/
public function push($key, $value){
return $this->client->rpush($key, $value); //从右边入队
}

/**
* 从队列头部取出数据
* @param string $key 对列名称
* @return mixed $value 数据
*/
public function pop($key){
return $this->client->lpop($key); //从左边取
}
}

你还是可以使用上面的例子测试下。

当然了,还有很多情况下,我们的队列要延时出队,我们就要使用redis的另外两种数据类型:有序集合zset和哈希hlist

最终代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class RedisQueue{

protected $client;
public function __construct(\Predis\Client $client)
{
$this->client = $client;
}

/**
* 从队列尾部插入数据
* @param string $key 对列名称
* @param int $delay 延迟多少秒
* @return mixed $value 数据
*/
public function push($key, $value, $delay = null){
if(is_null($delay)){ //不使用延迟的时候还是使用以前的rpush入队
return $this->client->rpush($key, $value);
}
$time = time() + $delay;
if($time > time()){ //还未到该入队的时间时
$hash_key = md5($this->randString(16).'_'.time().'_'.$value); //生成一个唯一key
return $this->client->transaction(function($tx) use ($key, $time, $hash_key, $value){ //使用reids事务
/** @var \Predis\Client $tx */
$tx->zadd($key.':zset', [
$hash_key=>$time //这里以生成的唯一key做 对象member 以time做分数score
]);
$tx->hset($key.':hlist', $hash_key, $value); //并且保存唯一key和值的映射
$tx->expire($key.':zset', 7*86400); //有效期7天
$tx->expire($key.':hlist', 7*86400);
});
}
}

/**
* 取出数据
* @param string $key 对列名称
*/
public function receive($key){
if($this->getLock($key.':lock')){ //使用锁保证线程安全
$keys = $this->client->zrangebyscore($key.':zset',0, time()); //按当前时间取分数小于等于当前时间的集合元素
if(!empty($keys)) {
$message_datas = $this->client->hmget($key.':hlist', $keys); //通过这些key拿到值
$this->client->transaction(function($tx) use ($key, $keys, $message_datas){ //继续使用redis事务保证数据完整
/** @var \Predis\Client $tx */
foreach ($message_datas as $i => $message_data){
$tx->rpush($key, $message_data); //这时候才是真正的入队
$hash_key = $keys[$i];
$tx->hdel($key.':hlist', $hash_key); //从hash列表删除元素
$tx->zrem($key.':zset', $hash_key); //从集合删除元素
}
});
}
$this->releaseLock($key.':lock');//释放锁
}
return $this->client->lpop($key); //出队
}

public function getLock($key){
$ret = true;
if($this->client->incr($key) != 1){
$ret = false;
}
$ttl = $this->client->ttl($key);
if($ttl == -1) { //forever
$this->client->expire($key, 60);
}
return $ret;
}

public function releaseLock($key){
$this->client->del($key);
}

protected function randString($length){
$str = '';
$strPol = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz";
$max = strlen($strPol) - 1;

for ($i = 0; $i < $length; $i++) {
$str .= $strPol[mt_rand(0, $max)];
}
return $str;
}
}

主要看上面入队(push)和出队(receive)方法,我代码写了详细的注释。

打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!

请我喝杯咖啡吧~

支付宝
微信