-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathparsley.php
129 lines (110 loc) · 3.08 KB
/
parsley.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
<?php
/**
* 基础方法文件
*
* 基于redis的任务队列,类似于python celery,可用rabbitMQ代替redis消费更可靠。
* redis有list结构,它也有zset有序集合应为source的存在,使zset有了无限可能
* @author: liukelin [email protected]
**/
include_once(dirname(__FILE__).DIRECTORY_SEPARATOR.'../config.php');
include_once(__URL__.'parsley/redisQueue.php');
include_once(__URL__.'func.php');
global $conf;
$conf = $config;
class parsley{
/**
* 将执行任务写入队列
* @param unknown $queue func.php方法名
* @param unknown $args 参数(顺序array)
* @return boolean
*/
public function apply_async($queue, $args=array()){
global $conf;
if(empty($queue)){
return false;
}
$key = microtime(true)*10000;
$arr = array(
'fun'=>$queue,
'args'=>$args,
'key'=>$key,
);
$data = json_encode($arr);
# push redis zset
$redis = new redisQueue();
$redis->zadd($conf['queue_key'] , $key, $data);
return true;
}
/**
* 消费队列数据
*/
public function digestion_queue_data(){
global $conf;
$redis = new redisQueue();
while (1){
try {
$element = $redis->zlPop($conf['queue_key']);
}catch (Exception $e){
$redis = new redisQueue();
$element = $redis->zlPop($conf['queue_key']);
}
if(empty($element) && !$conf['keep']){
break;
}
$data = json_decode($element,true);
if (empty($data['fun'])) {
continue;
}
//执行
$ret = $this->call_func($data['fun'], $data['args']);
if ($ret==false) {
//消费失败计数
$incr = $redis->incr($element.'_error_no');
if($conf['again']>=0 && $conf['again']>=$incr){
/**
*消费失败 数据回归队列top100位置(头部/尾部/top2)
*避免放回头部,如果重复消费失败 阻塞任务
*/
$newSource = 0;
$posElement = $redis->zRange($conf['queue_key'],100,100,true);
if(!empty($posElement)){
$posElement = $redis->zRange($conf['queue_key'],-1,-1,true);//插入最后
}
$newSource = (!empty($posElement[0]))?reset($posElement):$data['key'];
$redis->zadd($conf['queue_key'] , $newSource, $element);
}
}
$redis->del($element.'_error_no');
$this->setLog(__URL__.$conf['logs'], date('Y-m-d H:i:s').",执行:{$element},return:{$ret}");
$element = null;
$data = null;
}
}
/**
* 执行方法
* @param unknown $queue 方法名
* @param unknown $args 方法参数
* @return mixed|boolean
*/
public function call_func($queue, $args=array()){
global $conf;
$ex = array();
try {
$ex = explode('.',$queue);
if (count($ex)>1) {
$c = new $ex[0];
return call_user_func_array(array($c,$ex[1]), $args);
}
return call_user_func_array($queue, $args);
}catch(Exception $e){
$this->setLog(__URL__.$conf['error_logs'],date('Y-m-d H:i:s').",执行:{$queue}".json_encode(array($args)).",return:{$e}");
return false;
}
}
public function setLog($file,$msg){
$file = @str_replace('{date}', date('Ymd'), $file);
$myfile = @fopen($file, "a+");
@fwrite($myfile, $msg."\r\n");
@fclose($myfile);
}
}