Skip to content

Commit 3cab832

Browse files
committed
add predis handler
1 parent ebe6d16 commit 3cab832

File tree

12 files changed

+530
-50
lines changed

12 files changed

+530
-50
lines changed

composer.json

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"description": "Queues for CodeIgniter 4 framework",
44
"license": "MIT",
55
"type": "library",
6-
"keywords": ["codeigniter", "codeigniter4", "queue"],
6+
"keywords": ["codeigniter", "codeigniter4", "queue", "database", "redis", "predis"],
77
"authors": [
88
{
99
"name": "michalsn",
@@ -18,7 +18,7 @@
1818
"require-dev": {
1919
"codeigniter4/devkit": "^1.0",
2020
"codeigniter4/framework": "^4.4",
21-
"rector/rector": "0.18.6"
21+
"predis/predis": "^2.0"
2222
},
2323
"minimum-stability": "dev",
2424
"prefer-stable": true,
@@ -32,6 +32,10 @@
3232
"Tests\\": "tests"
3333
}
3434
},
35+
"suggest": {
36+
"ext-redis": "If you want to use RedisHandler",
37+
"predis/predis": "If you want to use PredisHandler"
38+
},
3539
"config": {
3640
"allow-plugins": {
3741
"phpstan/extension-installer": true

docs/configuration.md

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ Available options:
1515
- [$defaultHandler](#defaultHandler)
1616
- [$handlers](#handlers)
1717
- [$database](#database)
18+
- [$redis](#redis)
19+
- [$predis](#predis)
1820
- [$keepDoneJobs](#keepDoneJobs)
1921
- [$keepFailedJobs](#keepFailedJobs)
2022
- [$queueDefaultPriority](#queueDefaultPriority)
@@ -27,7 +29,7 @@ The default handler used by the library. Default value: `database`.
2729

2830
### $handlers
2931

30-
An array of available handlers. By now only `database` handler is implemented.
32+
An array of available handlers. By now only `database`, `redis` and `predis` handlers are implemented.
3133

3234
### $database
3335

@@ -36,6 +38,29 @@ The configuration settings for `database` handler.
3638
* `dbGroup` - The database group to use. Default value: `default`.
3739
* `getShared` - Weather to use shared instance. Default value: `true`.
3840

41+
### $redis
42+
43+
The configuration settings for `redis` handler. You need to have a [ext-redis](https://github.com/phpredis/phpredis) installed to use it.
44+
45+
* `host` - The host name or unix socket. Default value: `127.0.0.1`.
46+
* `password` - The password. Default value: `null`.
47+
* `port` - The port number. Default value: `6379`.
48+
* `timeout` - The timeout for connection. Default value: `0`.
49+
* `database` - The database number. Default value: `0`.
50+
* `prefix` - The default key prefix. Default value: `''` (not set).
51+
52+
### $predis
53+
54+
The configuration settings for `predis` handler. You need to have [Predis](https://github.com/predis/predis) installed to use it.
55+
56+
* `scheme` - The scheme to use: `tcp`, `tls` or `unix`. Default value: `tcp`.
57+
* `host` - The host name. Default value: `127.0.0.1`.
58+
* `password` - The password. Default value: `null`.
59+
* `port` - The port number (when `tcp`). Default value: `6379`.
60+
* `timeout` - The timeout for connection. Default value: `5`.
61+
* `database` - The database number. Default value: `0`.
62+
* `prefix` - The default key prefix. Default value: `''` (not set).
63+
3964
### $keepDoneJobs
4065

4166
If the job is done, should we keep it in the table? Default value: `false`.

phpstan.neon.dist

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,21 @@ parameters:
2424
message: '#Call to deprecated function random_string\(\):#'
2525
paths:
2626
- src/Handlers/RedisHandler.php
27+
- src/Handlers/PredisHandler.php
28+
-
29+
message: '#Cannot access property \$timestamp on array\|bool\|float\|int\|object\|string.#'
30+
paths:
31+
- tests/_support/Database/Seeds/TestRedisQueueSeeder.php
2732
-
2833
message: '#Access to an undefined property CodeIgniter\\I18n\\Time::\$timestamp.#'
2934
paths:
3035
- src/Handlers/BaseHandler.php
3136
- src/Handlers/DatabaseHandler.php
3237
- src/Handlers/RedisHandler.php
38+
- src/Handlers/PredisHandler.php
3339
- src/Models/QueueJobModel.php
40+
- tests/RedisHandlerTest.php
41+
- tests/PredisHandlerTest.php
3442
-
3543
message: '#Call to an undefined method Michalsn\\CodeIgniterQueue\\Models\\QueueJobFailedModel::affectedRows\(\).#'
3644
paths:

src/Config/Queue.php

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use CodeIgniter\Config\BaseConfig;
66
use Michalsn\CodeIgniterQueue\Exceptions\QueueException;
77
use Michalsn\CodeIgniterQueue\Handlers\DatabaseHandler;
8+
use Michalsn\CodeIgniterQueue\Handlers\PredisHandler;
89
use Michalsn\CodeIgniterQueue\Handlers\RedisHandler;
910

1011
class Queue extends BaseConfig
@@ -20,6 +21,7 @@ class Queue extends BaseConfig
2021
public array $handlers = [
2122
'database' => DatabaseHandler::class,
2223
'redis' => RedisHandler::class,
24+
'predis' => PredisHandler::class,
2325
];
2426

2527
/**
@@ -31,14 +33,28 @@ class Queue extends BaseConfig
3133
];
3234

3335
/**
34-
* Redis and Predis handler config.
36+
* Redis handler config.
3537
*/
3638
public array $redis = [
3739
'host' => '127.0.0.1',
3840
'password' => null,
3941
'port' => 6379,
4042
'timeout' => 0,
4143
'database' => 0,
44+
'prefix' => '',
45+
];
46+
47+
/**
48+
* Predis handler config.
49+
*/
50+
public array $predis = [
51+
'scheme' => 'tcp',
52+
'host' => '127.0.0.1',
53+
'password' => null,
54+
'port' => 6379,
55+
'timeout' => 5,
56+
'database' => 0,
57+
'prefix' => '',
4258
];
4359

4460
/**

src/Exceptions/QueueException.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,16 @@ public static function forIncorrectHandler(): static
1111
return new self(lang('Queue.incorrectHandler'));
1212
}
1313

14+
public static function forIncorrectQueueFormat(): static
15+
{
16+
return new self(lang('Queue.incorrectQueueFormat'));
17+
}
18+
19+
public static function forTooLongQueueName(): static
20+
{
21+
return new self(lang('Queue.tooLongQueueName'));
22+
}
23+
1424
public static function forIncorrectJobHandler(): static
1525
{
1626
return new self(lang('Queue.incorrectJobHandler'));

src/Handlers/BaseHandler.php

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@ protected function logFailed(QueueJob $queueJob, Throwable $err): bool
136136
*/
137137
protected function validateJobAndPriority(string $queue, string $job): void
138138
{
139+
// Validate queue
140+
$this->validateQueue($queue);
141+
139142
// Validate jobHandler.
140143
if (! in_array($job, array_keys($this->config->jobHandlers), true)) {
141144
throw QueueException::forIncorrectJobHandler();
@@ -150,4 +153,18 @@ protected function validateJobAndPriority(string $queue, string $job): void
150153
throw QueueException::forIncorrectQueuePriority($this->priority, $queue);
151154
}
152155
}
156+
157+
/**
158+
* Validate queue name.
159+
*/
160+
protected function validateQueue(string $queue): void
161+
{
162+
if (! preg_match('/^[a-z0-9_-]+$/', $queue)) {
163+
throw QueueException::forIncorrectQueueFormat();
164+
}
165+
166+
if (strlen($queue) > 64) {
167+
throw QueueException::forTooLongQueueName();
168+
}
169+
}
153170
}

src/Handlers/PredisHandler.php

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
<?php
2+
3+
namespace Michalsn\CodeIgniterQueue\Handlers;
4+
5+
use CodeIgniter\Exceptions\CriticalError;
6+
use CodeIgniter\I18n\Time;
7+
use Exception;
8+
use Michalsn\CodeIgniterQueue\Config\Queue as QueueConfig;
9+
use Michalsn\CodeIgniterQueue\Entities\QueueJob;
10+
use Michalsn\CodeIgniterQueue\Enums\Status;
11+
use Michalsn\CodeIgniterQueue\Interfaces\QueueInterface;
12+
use Michalsn\CodeIgniterQueue\Payload;
13+
use Predis\Client;
14+
use Throwable;
15+
16+
class PredisHandler extends BaseHandler implements QueueInterface
17+
{
18+
private readonly Client $predis;
19+
20+
public function __construct(protected QueueConfig $config)
21+
{
22+
try {
23+
$this->predis = new Client($config->predis, ['prefix' => $config->predis['prefix']]);
24+
$this->predis->time();
25+
} catch (Exception $e) {
26+
throw new CriticalError('Queue: Predis connection refused (' . $e->getMessage() . ').');
27+
}
28+
}
29+
30+
/**
31+
* Add job to the queue.
32+
*/
33+
public function push(string $queue, string $job, array $data): bool
34+
{
35+
$this->validateJobAndPriority($queue, $job);
36+
37+
helper('text');
38+
39+
$queueJob = new QueueJob([
40+
'id' => random_string('numeric', 16),
41+
'queue' => $queue,
42+
'payload' => new Payload($job, $data),
43+
'priority' => $this->priority,
44+
'status' => Status::PENDING->value,
45+
'attempts' => 0,
46+
'available_at' => Time::now()->timestamp,
47+
]);
48+
49+
$result = $this->predis->zadd("queues:{$queue}:{$this->priority}", [json_encode($queueJob) => Time::now()->timestamp]);
50+
51+
$this->priority = null;
52+
53+
return $result > 0;
54+
}
55+
56+
/**
57+
* Get job from the queue.
58+
*/
59+
public function pop(string $queue, array $priorities): ?QueueJob
60+
{
61+
$now = Time::now()->timestamp;
62+
63+
foreach ($priorities as $priority) {
64+
if ($tasks = $this->predis->zrangebyscore("queues:{$queue}:{$priority}", '-inf', $now, ['LIMIT' => [0, 1]])) {
65+
if ($this->predis->zrem("queues:{$queue}:{$priority}", ...$tasks)) {
66+
break;
67+
}
68+
$tasks = [];
69+
}
70+
}
71+
72+
if (empty($tasks[0])) {
73+
return null;
74+
}
75+
76+
$queueJob = new QueueJob(json_decode((string) $tasks[0], true));
77+
78+
// Set the actual status as in DB.
79+
$queueJob->status = Status::RESERVED->value;
80+
$queueJob->syncOriginal();
81+
82+
$this->predis->hset("queues:{$queue}::reserved", $queueJob->id, json_encode($queueJob));
83+
84+
return $queueJob;
85+
}
86+
87+
/**
88+
* Schedule job for later
89+
*/
90+
public function later(QueueJob $queueJob, int $seconds): bool
91+
{
92+
$queueJob->status = Status::PENDING->value;
93+
$queueJob->available_at = Time::now()->addSeconds($seconds)->timestamp;
94+
95+
if ($result = $this->predis->zadd("queues:{$queueJob->queue}:{$queueJob->priority}", [json_encode($queueJob) => $queueJob->available_at->timestamp])) {
96+
$this->predis->hdel("queues:{$queueJob->queue}::reserved", $queueJob->id);
97+
}
98+
99+
return $result > 0;
100+
}
101+
102+
/**
103+
* Move job to failed table or move and delete.
104+
*/
105+
public function failed(QueueJob $queueJob, Throwable $err, bool $keepJob): bool
106+
{
107+
if ($keepJob) {
108+
$this->logFailed($queueJob, $err);
109+
}
110+
111+
return (bool) $this->predis->hdel("queues:{$queueJob->queue}::reserved", $queueJob->id);
112+
}
113+
114+
/**
115+
* Change job status to DONE or delete it.
116+
*/
117+
public function done(QueueJob $queueJob, bool $keepJob): bool
118+
{
119+
if ($keepJob) {
120+
$queueJob->status = Status::DONE->value;
121+
$this->predis->lpush("queues:{$queueJob->queue}::done", [json_encode($queueJob)]);
122+
}
123+
124+
return (bool) $this->predis->hdel("queues:{$queueJob->queue}::reserved", $queueJob->id);
125+
}
126+
127+
/**
128+
* Delete queue jobs
129+
*/
130+
public function clear(?string $queue = null): bool
131+
{
132+
if ($queue !== null) {
133+
if ($keys = $this->predis->keys("queues:{$queue}:*")) {
134+
return $this->predis->del($keys) > 0;
135+
}
136+
137+
return true;
138+
}
139+
140+
if ($keys = $this->predis->keys('queues:*')) {
141+
return $this->predis->del($keys) > 0;
142+
}
143+
144+
return true;
145+
}
146+
}

src/Handlers/RedisHandler.php

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ public function __construct(protected QueueConfig $config)
3333
if (isset($config->redis['database']) && ! $this->redis->select($config->redis['database'])) {
3434
throw new CriticalError('Queue: Redis select database failed.');
3535
}
36+
37+
if (isset($config->redis['prefix']) && ! $this->redis->setOption(Redis::OPT_PREFIX, $config->redis['prefix'])) {
38+
throw new CriticalError('Queue: Redis setting prefix failed.');
39+
}
3640
} catch (RedisException $e) {
3741
throw new CriticalError('Queue: RedisException occurred with message (' . $e->getMessage() . ').');
3842
}
@@ -59,7 +63,7 @@ public function push(string $queue, string $job, array $data): bool
5963
'available_at' => Time::now()->timestamp,
6064
]);
6165

62-
$result = $this->redis->zAdd("queues:{$queue}:{$this->priority}", Time::now()->timestamp, json_encode($queueJob));
66+
$result = (int) $this->redis->zAdd("queues:{$queue}:{$this->priority}", Time::now()->timestamp, json_encode($queueJob));
6367

6468
$this->priority = null;
6569

@@ -109,7 +113,7 @@ public function later(QueueJob $queueJob, int $seconds): bool
109113
$queueJob->status = Status::PENDING->value;
110114
$queueJob->available_at = Time::now()->addSeconds($seconds)->timestamp;
111115

112-
if ($result = $this->redis->zAdd("queues:{$queueJob->queue}:{$queueJob->priority}", $queueJob->available_at->timestamp, json_encode($queueJob))) {
116+
if ($result = (int) $this->redis->zAdd("queues:{$queueJob->queue}:{$queueJob->priority}", $queueJob->available_at->timestamp, json_encode($queueJob))) {
113117
$this->redis->hDel("queues:{$queueJob->queue}::reserved", $queueJob->id);
114118
}
115119

@@ -152,14 +156,14 @@ public function clear(?string $queue = null): bool
152156
{
153157
if ($queue !== null) {
154158
if ($keys = $this->redis->keys("queues:{$queue}:*")) {
155-
return $this->redis->del($keys) > 0;
159+
return (int) $this->redis->del($keys) > 0;
156160
}
157161

158162
return true;
159163
}
160164

161165
if ($keys = $this->redis->keys('queues:*')) {
162-
return $this->redis->del($keys) > 0;
166+
return (int) $this->redis->del($keys) > 0;
163167
}
164168

165169
return true;

src/Language/en/Queue.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
],
88
],
99
'incorrectHandler' => 'This queue handler is incorrect.',
10+
'incorrectQueueFormat' => 'The queue name should consists only lowercase letters or numbers.',
11+
'tooLongQueueName' => 'The queue name is too long. It should be no longer than 64 letters.',
1012
'incorrectJobHandler' => 'This job name is not defined in the $jobHandlers array.',
1113
'incorrectPriorityFormat' => 'The priority name should consists only lowercase letters.',
1214
'tooLongPriorityName' => 'The priority name is too long. It should be no longer than 64 letters.',

0 commit comments

Comments
 (0)