Skip to content

Commit d1b1216

Browse files
PSUBSCRIBE
Implemented PSUBSCRIBE/PUNSUBSCRIBE and changed the present subscribe and unsubscribe functions into generic versions that can handle both cases.
1 parent 9eb217c commit d1b1216

File tree

3 files changed

+89
-26
lines changed

3 files changed

+89
-26
lines changed

README.markdown

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,20 @@ function f($redis, $chan, $msg) {
377377
$redis->subscribe(array('chan-1', 'chan-2', 'chan-3'), 'f'); // subscribe to 3 chans
378378
</pre>
379379

380+
## psubscribe
381+
##### Description
382+
Subscribe to channels by pattern
383+
##### Parameters
384+
*patterns*: An array of patterns to match
385+
*callback*: Either a string or an array with an object and method. The callback will get four arguments ($redis, $pattern, $channel, $message)
386+
##### Example
387+
<pre>
388+
function psubscribe($redis, $pattern, $chan, $msg) {
389+
echo "Pattern: $pattern\n";
390+
echo "Channel: $chan\n";
391+
echo "Payload: $msg\n";
392+
}
393+
</pre>
380394

381395
## publish
382396
##### Description

php_redis.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,9 @@ PHP_METHOD(Redis, pipeline);
171171

172172
PHP_METHOD(Redis, publish);
173173
PHP_METHOD(Redis, subscribe);
174+
PHP_METHOD(Redis, psubscribe);
174175
PHP_METHOD(Redis, unsubscribe);
176+
PHP_METHOD(Redis, punsubscribe);
175177

176178
PHP_METHOD(Redis, getOption);
177179
PHP_METHOD(Redis, setOption);
@@ -204,6 +206,9 @@ PHPAPI void generic_empty_cmd_impl(INTERNAL_FUNCTION_PARAMETERS, char *cmd, int
204206
PHPAPI void generic_empty_cmd(INTERNAL_FUNCTION_PARAMETERS, char *cmd, int cmd_len, ...);
205207
PHPAPI void generic_empty_long_cmd(INTERNAL_FUNCTION_PARAMETERS, char *cmd, int cmd_len, ...);
206208

209+
PHPAPI void generic_subscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, char *sub_cmd);
210+
PHPAPI void generic_unsubscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, char *unsub_cmd);
211+
207212
PHPAPI void array_zip_values_and_scores(RedisSock *redis_sock, zval *z_tab, int use_atof TSRMLS_DC);
208213
PHPAPI int redis_response_enqueued(RedisSock *redis_sock TSRMLS_DC);
209214

redis.c

Lines changed: 70 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,9 @@ static zend_function_entry redis_functions[] = {
206206

207207
PHP_ME(Redis, publish, NULL, ZEND_ACC_PUBLIC)
208208
PHP_ME(Redis, subscribe, NULL, ZEND_ACC_PUBLIC)
209+
PHP_ME(Redis, psubscribe, NULL, ZEND_ACC_PUBLIC)
209210
PHP_ME(Redis, unsubscribe, NULL, ZEND_ACC_PUBLIC)
211+
PHP_ME(Redis, punsubscribe, NULL, ZEND_ACC_PUBLIC)
210212

211213
PHP_ME(Redis, time, NULL, ZEND_ACC_PUBLIC)
212214

@@ -5274,24 +5276,19 @@ PHP_METHOD(Redis, publish)
52745276
REDIS_PROCESS_RESPONSE(redis_long_response);
52755277
}
52765278

5277-
/*
5278-
subscribe channel_1 channel_2 ... channel_n
5279-
subscribe(array(channel_1, channel_2, ..., channel_n), callback)
5280-
*/
5281-
PHP_METHOD(Redis, subscribe)
5279+
PHPAPI void generic_subscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, char *sub_cmd)
52825280
{
52835281
zval *z_callback,*object, *array, **data;
52845282
HashTable *arr_hash;
52855283
HashPosition pointer;
52865284
RedisSock *redis_sock;
52875285
char *cmd = "", *old_cmd = NULL, *callback_ft_name;
52885286
int cmd_len, array_count, callback_ft_name_len;
5289-
52905287
zval *z_tab, **tmp;
52915288
char *type_response;
52925289

52935290
int callback_type = 0;
5294-
zval *z_o, *z_fun = NULL,*z_ret, *z_args[3];
5291+
zval *z_o, *z_fun = NULL,*z_ret, *z_args[4];
52955292
char *method_name;
52965293

52975294
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Oaz|z",
@@ -5327,7 +5324,7 @@ PHP_METHOD(Redis, subscribe)
53275324
}
53285325

53295326
old_cmd = cmd;
5330-
cmd_len = spprintf(&cmd, 0, "SUBSCRIBE %s\r\n", cmd);
5327+
cmd_len = spprintf(&cmd, 0, "%s %s\r\n", sub_cmd, cmd);
53315328
efree(old_cmd);
53325329
if (redis_sock_write(redis_sock, cmd, cmd_len TSRMLS_CC) < 0) {
53335330
efree(cmd);
@@ -5344,7 +5341,7 @@ PHP_METHOD(Redis, subscribe)
53445341

53455342
if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 0, (void**)&tmp) == SUCCESS) {
53465343
type_response = Z_STRVAL_PP(tmp);
5347-
if(strcmp(type_response, "subscribe") != 0) {
5344+
if(strcmp(type_response, sub_cmd) != 0) {
53485345
efree(tmp);
53495346
efree(z_tab);
53505347
RETURN_FALSE;
@@ -5385,42 +5382,66 @@ PHP_METHOD(Redis, subscribe)
53855382
/* Multibulk Response, format : {message type, originating channel, message payload} */
53865383
while(1) {
53875384
/* call the callback with this z_tab in argument */
5388-
zval **type, **channel, **data;
5385+
zval **type, **channel, **pattern, **data;
53895386
z_tab = redis_sock_read_multibulk_reply_zval(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock);
5387+
int is_pmsg, tab_idx = 1;
53905388

53915389
if(z_tab == NULL || Z_TYPE_P(z_tab) != IS_ARRAY) {
53925390
//ERROR
53935391
break;
53945392
}
53955393

5396-
if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 0, (void**)&type) == FAILURE) {
5394+
if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 0, (void**)&type) == FAILURE || Z_TYPE_PP(type) != IS_STRING) {
53975395
break;
53985396
}
5399-
if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 1, (void**)&channel) == FAILURE) {
5397+
5398+
// Make sure we have a message or pmessage
5399+
if(!strncmp(Z_STRVAL_PP(type), "message", 7) || !strncmp(Z_STRVAL_PP(type), "pmessage", 8)) {
5400+
// Is this a pmessage
5401+
is_pmsg = *Z_STRVAL_PP(type) == 'p';
5402+
} else {
5403+
continue; // It's not a message or pmessage
5404+
}
5405+
5406+
// If this is a pmessage, we'll want to extract the pattern first
5407+
if(is_pmsg) {
5408+
// Extract pattern
5409+
if(zend_hash_index_find(Z_ARRVAL_P(z_tab), tab_idx++, (void**)&pattern) == FAILURE) {
5410+
break;
5411+
}
5412+
}
5413+
5414+
// Extract channel and data
5415+
if (zend_hash_index_find(Z_ARRVAL_P(z_tab), tab_idx++, (void**)&channel) == FAILURE) {
54005416
break;
5401-
}
5402-
if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 2, (void**)&data) == FAILURE) {
5417+
}
5418+
if (zend_hash_index_find(Z_ARRVAL_P(z_tab), tab_idx++, (void**)&data) == FAILURE) {
54035419
break;
54045420
}
54055421

5406-
if(Z_TYPE_PP(type) == IS_STRING && strncmp(Z_STRVAL_PP(type), "message", 7) != 0) {
5407-
continue; /* only forwarding published messages */
5408-
}
5409-
5422+
// Always pass the Redis object through
54105423
z_args[0] = getThis();
5411-
z_args[1] = *channel;
5412-
z_args[2] = *data;
5424+
5425+
// Set up our callback args depending on the message type
5426+
if(is_pmsg) {
5427+
z_args[1] = *pattern;
5428+
z_args[2] = *channel;
5429+
z_args[3] = *data;
5430+
} else {
5431+
z_args[1] = *channel;
5432+
z_args[2] = *data;
5433+
}
54135434

54145435
switch(callback_type) {
54155436
case R_SUB_CALLBACK_CLASS_TYPE:
54165437
MAKE_STD_ZVAL(z_ret);
5417-
call_user_function(&redis_ce->function_table, &z_o, z_fun, z_ret, 3, z_args TSRMLS_CC);
5438+
call_user_function(&redis_ce->function_table, &z_o, z_fun, z_ret, tab_idx, z_args TSRMLS_CC);
54185439
efree(z_ret);
54195440
break;
54205441

54215442
case R_SUB_CALLBACK_FT_TYPE:
54225443
MAKE_STD_ZVAL(z_ret);
5423-
call_user_function(EG(function_table), NULL, z_fun, z_ret, 3, z_args TSRMLS_CC);
5444+
call_user_function(EG(function_table), NULL, z_fun, z_ret, tab_idx, z_args TSRMLS_CC);
54245445
efree(z_ret);
54255446
break;
54265447
}
@@ -5433,9 +5454,22 @@ PHP_METHOD(Redis, subscribe)
54335454
efree(z_fun);
54345455
}
54355456

5457+
/* {{{ proto void Redis::psubscribe(Array(channel1, channel2, ... channelN))
5458+
*/
5459+
PHP_METHOD(Redis, psubscribe)
5460+
{
5461+
generic_subscribe_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, "psubscribe");
5462+
}
5463+
5464+
/* {{{ proto void Redis::psubscribe(Array(channel1, channel2, ... channelN))
5465+
*/
5466+
PHP_METHOD(Redis, subscribe) {
5467+
generic_subscribe_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, "subscribe");
5468+
}
5469+
54365470
/**
5437-
* unsubscribe channel_0 channel_1 ... channel_n
5438-
* unsubscribe(array(channel_0, channel_1, ..., channel_n))
5471+
* [p]unsubscribe channel_0 channel_1 ... channel_n
5472+
* [p]unsubscribe(array(channel_0, channel_1, ..., channel_n))
54395473
* response format :
54405474
* array(
54415475
* channel_0 => TRUE|FALSE,
@@ -5445,7 +5479,7 @@ PHP_METHOD(Redis, subscribe)
54455479
* );
54465480
**/
54475481

5448-
PHP_METHOD(Redis, unsubscribe)
5482+
PHPAPI void generic_unsubscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, char *unsub_cmd)
54495483
{
54505484
zval *object, *array, **data;
54515485
HashTable *arr_hash;
@@ -5490,7 +5524,7 @@ PHP_METHOD(Redis, unsubscribe)
54905524
}
54915525

54925526
old_cmd = cmd;
5493-
cmd_len = spprintf(&cmd, 0, "UNSUBSCRIBE %s\r\n", cmd);
5527+
cmd_len = spprintf(&cmd, 0, "%s %s\r\n", unsub_cmd, cmd);
54945528
efree(old_cmd);
54955529

54965530
if (redis_sock_write(redis_sock, cmd, cmd_len TSRMLS_CC) < 0) {
@@ -5520,6 +5554,16 @@ PHP_METHOD(Redis, unsubscribe)
55205554
}
55215555
}
55225556

5557+
PHP_METHOD(Redis, unsubscribe)
5558+
{
5559+
generic_unsubscribe_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, "UNSUBSCRIBE");
5560+
}
5561+
5562+
PHP_METHOD(Redis, punsubscribe)
5563+
{
5564+
generic_unsubscribe_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, "PUNSUBSCRIBE");
5565+
}
5566+
55235567
/* {{{ proto string Redis::bgrewriteaof()
55245568
*/
55255569
PHP_METHOD(Redis, bgrewriteaof)

0 commit comments

Comments
 (0)