Skip to content

Commit

Permalink
Implement new KafkaConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
arnaud-lb committed Feb 18, 2016
1 parent 4499e32 commit dc3da0b
Show file tree
Hide file tree
Showing 14 changed files with 1,410 additions and 42 deletions.
20 changes: 12 additions & 8 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
.deps
*.lo
*.la
*.lo
*.swp
.deps
.libs
Makefile
Makefile.fragments
Makefile.global
Makefile.objects
acinclude.m4
aclocal.m4
autom4te.cache
Expand All @@ -19,17 +24,16 @@ include
install-sh
libtool
ltmain.sh
Makefile
Makefile.fragments
Makefile.global
Makefile.objects
missing
mkinstalldirs
modules
package.xml
rdkafka-*.tgz
run-tests.php
tests/*/*.diff
tests/*/*.out
tests/*/*.php
tests/*/*.exp
tests/*/*.log
tests/*/*.out
tests/*/*.php
tests/*/*.sh
tmp-php.ini
148 changes: 132 additions & 16 deletions conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,38 @@
#include "Zend/zend_exceptions.h"
#include "ext/spl/spl_exceptions.h"
#include "conf.h"
#include "topic_partition.h"

zend_class_entry * ce_kafka_conf;
zend_class_entry * ce_kafka_topic_conf;

static void kafka_conf_callback_dtor(kafka_conf_callback *cb TSRMLS_DC) /* {{{ */
{
if (cb->fci.function_name) {
zval_ptr_dtor(&cb->fci.function_name);
}
} /* }}} */

void kafka_conf_callbacks_dtor(kafka_conf_callbacks *cbs TSRMLS_DC) /* {{{ */
{
kafka_conf_callback_dtor(&cbs->error TSRMLS_CC);
kafka_conf_callback_dtor(&cbs->rebalance TSRMLS_CC);
} /* }}} */

static void kafka_conf_callback_copy(kafka_conf_callback *to, kafka_conf_callback *from TSRMLS_DC) /* {{{ */
{
*to = *from;
if (to->fci.function_name) {
Z_ADDREF_P(to->fci.function_name);
}
} /* }}} */

void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *from TSRMLS_DC) /* {{{ */
{
kafka_conf_callback_copy(&to->error, &from->error TSRMLS_CC);
kafka_conf_callback_copy(&to->rebalance, &from->rebalance TSRMLS_CC);
} /* }}} */

static void kafka_conf_free(void *object TSRMLS_DC) /* {{{ */
{
kafka_conf_object *intern = (kafka_conf_object*)object;
Expand All @@ -41,9 +69,7 @@ static void kafka_conf_free(void *object TSRMLS_DC) /* {{{ */
if (intern->u.conf) {
rd_kafka_conf_destroy(intern->u.conf);
}
if (intern->error_cb.fci.function_name) {
zval_ptr_dtor(&intern->error_cb.fci.function_name);
}
kafka_conf_callbacks_dtor(&intern->cbs TSRMLS_CC);
break;
case KAFKA_TOPIC_CONF:
if (intern->u.topic_conf) {
Expand Down Expand Up @@ -88,39 +114,94 @@ kafka_conf_object * get_kafka_conf_object(zval *zconf TSRMLS_DC)

static void kafka_conf_error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque)
{
kafka_conf_object *intern = (kafka_conf_object*) opaque;
kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque;
zval *retval;
zval **args[2];
zval **args[3];
zval *zrk;
zval *zerr;
zval *zreason;
TSRMLS_FETCH();

if (!intern->error_cb.fci.function_name) {
if (!opaque) {
return;
}

if (!cbs->error.fci.function_name) {
return;
}

ALLOC_INIT_ZVAL(zrk);
ZVAL_ZVAL(zrk, &cbs->rk, 1, 0);

ALLOC_INIT_ZVAL(zerr);
ZVAL_LONG(zerr, err);

ALLOC_INIT_ZVAL(zreason);
ZVAL_STRING(zreason, reason, 1);

args[0] = &zrk;
args[0] = &zerr;
args[1] = &zreason;

intern->error_cb.fci.retval_ptr_ptr = &retval;
intern->error_cb.fci.params = args;
intern->error_cb.fci.param_count = 2;
cbs->error.fci.retval_ptr_ptr = &retval;
cbs->error.fci.params = args;
cbs->error.fci.param_count = 3;

zend_call_function(&intern->error_cb.fci, &intern->error_cb.fcc TSRMLS_CC);
zend_call_function(&cbs->error.fci, &cbs->error.fcc TSRMLS_CC);

if (retval) {
zval_ptr_dtor(&retval);
}
zval_ptr_dtor(&zrk);
zval_ptr_dtor(&zerr);
zval_ptr_dtor(&zreason);
}

static void kafka_conf_rebalance_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque)
{
kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque;
zval *retval;
zval **args[3];
zval *zrk;
zval *zerr;
zval *zpartitions;
TSRMLS_FETCH();

if (!opaque) {
return;
}

if (!cbs->rebalance.fci.function_name) {
return;
}

ALLOC_INIT_ZVAL(zrk);
ZVAL_ZVAL(zrk, &cbs->rk, 1, 0);

ALLOC_INIT_ZVAL(zerr);
ZVAL_LONG(zerr, err);

ALLOC_INIT_ZVAL(zpartitions);
kafka_topic_partition_list_to_array(zpartitions, partitions TSRMLS_CC);

args[0] = &zrk;
args[1] = &zerr;
args[2] = &zpartitions;

cbs->rebalance.fci.retval_ptr_ptr = &retval;
cbs->rebalance.fci.params = args;
cbs->rebalance.fci.param_count = 3;

zend_call_function(&cbs->rebalance.fci, &cbs->rebalance.fcc TSRMLS_CC);

if (retval) {
zval_ptr_dtor(&retval);
}
zval_ptr_dtor(&zrk);
zval_ptr_dtor(&zerr);
zval_ptr_dtor(&zpartitions);
}

/* {{{ proto RdKafka\Conf::__construct() */

ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_conf___construct, 0, 0, 0)
Expand All @@ -142,8 +223,6 @@ PHP_METHOD(RdKafka__Conf, __construct)
intern->type = KAFKA_CONF;
intern->u.conf = rd_kafka_conf_new();

rd_kafka_conf_set_opaque(intern->u.conf, intern);

zend_restore_error_handling(&error_handling TSRMLS_CC);
}
/* }}} */
Expand Down Expand Up @@ -268,17 +347,53 @@ PHP_METHOD(RdKafka__Conf, setErrorCb)

Z_ADDREF_P(fci.function_name);

if (intern->error_cb.fci.function_name) {
zval_ptr_dtor(&intern->error_cb.fci.function_name);
if (intern->cbs.error.fci.function_name) {
zval_ptr_dtor(&intern->cbs.error.fci.function_name);
}

intern->error_cb.fci = fci;
intern->error_cb.fcc = fcc;
intern->cbs.error.fci = fci;
intern->cbs.error.fcc = fcc;

rd_kafka_conf_set_error_cb(intern->u.conf, kafka_conf_error_cb);
}
/* }}} */

/* {{{ proto void RdKafka\Conf::setRebalanceCb(mixed $callback)
Set rebalance callback for use with coordinated consumer group balancing */

ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_conf_set_rebalance_cb, 0, 0, 1)
ZEND_ARG_INFO(0, callback)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Conf, setRebalanceCb)
{
zend_fcall_info fci;
zend_fcall_info_cache fcc;
kafka_conf_object *intern;

if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "f", &fci, &fcc) == FAILURE) {
return;
}

intern = get_kafka_conf_object(this_ptr TSRMLS_CC);
if (!intern) {
return;
}

Z_ADDREF_P(fci.function_name);

if (intern->cbs.rebalance.fci.function_name) {
zval_ptr_dtor(&intern->cbs.rebalance.fci.function_name);
}

intern->cbs.rebalance.fci = fci;
intern->cbs.rebalance.fcc = fcc;

rd_kafka_conf_set_rebalance_cb(intern->u.conf, kafka_conf_rebalance_cb);
}
/* }}} */


/* {{{ proto RdKafka\TopicConf::__construct() */
PHP_METHOD(RdKafka__TopicConf, __construct)
{
Expand Down Expand Up @@ -352,6 +467,7 @@ static const zend_function_entry kafka_conf_fe[] = {
PHP_ME(RdKafka__Conf, dump, arginfo_kafka_conf_dump, ZEND_ACC_PUBLIC)
PHP_ME(RdKafka__Conf, set, arginfo_kafka_conf_set, ZEND_ACC_PUBLIC)
PHP_ME(RdKafka__Conf, setErrorCb, arginfo_kafka_conf_set_error_cb, ZEND_ACC_PUBLIC)
PHP_ME(RdKafka__Conf, setRebalanceCb, arginfo_kafka_conf_set_rebalance_cb, ZEND_ACC_PUBLIC)
PHP_FE_END
};

Expand Down
24 changes: 20 additions & 4 deletions conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
+----------------------------------------------------------------------+
*/

#ifndef KAFKA_CONF_H
#define KAFKA_CONF_H

enum {
MSG_PARTITIONER_RANDOM = 2
#ifdef HAVE_RD_KAFKA_MSG_PARTIIONER_CONSISTENT
Expand All @@ -28,21 +31,34 @@ typedef enum {
KAFKA_TOPIC_CONF
} kafka_conf_type;

typedef struct _kafka_conf_callback {
zend_fcall_info fci;
zend_fcall_info_cache fcc;
} kafka_conf_callback;

typedef struct _kafka_conf_callbacks {
zval rk;
kafka_conf_callback error;
kafka_conf_callback rebalance;
} kafka_conf_callbacks;

typedef struct _kafka_conf_object {
zend_object std;
kafka_conf_type type;
union {
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
} u;
struct {
zend_fcall_info fci;
zend_fcall_info_cache fcc;
} error_cb;
kafka_conf_callbacks cbs;
} kafka_conf_object;

kafka_conf_object * get_kafka_conf_object(zval *zconf TSRMLS_DC);
void kafka_conf_minit(TSRMLS_D);

void kafka_conf_callbacks_dtor(kafka_conf_callbacks *cbs TSRMLS_DC);
void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *from TSRMLS_DC);

extern zend_class_entry * ce_kafka_conf;
extern zend_class_entry * ce_kafka_topic_conf;

#endif /* KAFKA_CONF_H */
13 changes: 11 additions & 2 deletions config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ if test "$PHP_RDKAFKA" != "no"; then

PHP_ADD_INCLUDE($RDKAFKA_DIR/include)

SOURCES="rdkafka.c metadata.c metadata_broker.c metadata_topic.c metadata_partition.c metadata_collection.c compat.c conf.c topic.c queue.c message.c"

LIBNAME=rdkafka
LIBSYMBOL=rd_kafka_new

Expand All @@ -45,8 +47,15 @@ if test "$PHP_RDKAFKA" != "no"; then
],[
AC_MSG_WARN([no rd_kafka_msg_partitioner_consistent, the consistent partitioner will not be available])
])


AC_CHECK_LIB($LIBNAME,[rd_kafka_subscribe],[
AC_DEFINE(HAVE_NEW_KAFKA_CONSUMER,1,[ ])
SOURCES="$SOURCES kafka_consumer.c topic_partition.c"
],[
AC_MSG_WARN([no rd_kafka_subscribe, new KafkaConsumer will not be available])
])

PHP_SUBST(RDKAFKA_SHARED_LIBADD)

PHP_NEW_EXTENSION(rdkafka, rdkafka.c metadata.c metadata_broker.c metadata_topic.c metadata_partition.c metadata_collection.c compat.c conf.c topic.c queue.c message.c, $ext_shared)
PHP_NEW_EXTENSION(rdkafka, $SOURCES, $ext_shared)
fi
Loading

0 comments on commit dc3da0b

Please sign in to comment.