Skip to content

Commit

Permalink
Added conf debug context that dumps the configuration on client ins…
Browse files Browse the repository at this point in the history
…tantation
  • Loading branch information
edenhill committed Dec 16, 2020
1 parent a850afd commit c40941d
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 25 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ and the sticky consumer group partition assignor.
which has vast performance improvements.
* Added `rd_kafka_conf_get_default_topic_conf()` to retrieve the
default topic configuration object from a global configuration object.
* Added `conf` debugging context to `debug` - shows set configuration
properties on client and topic instantiation. Sensitive properties
are redacted.


## Fixes
Expand Down
2 changes: 1 addition & 1 deletion CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ topic.metadata.refresh.fast.cnt | * | 0 .. 1000 | 10
topic.metadata.refresh.sparse | * | true, false | true | low | Sparse metadata requests (consumes less network bandwidth) <br>*Type: boolean*
topic.metadata.propagation.max.ms | * | 0 .. 3600000 | 30000 | low | Apache Kafka topic creation is asynchronous and it takes some time for a new topic to propagate throughout the cluster to all brokers. If a client requests topic metadata after manual topic creation but before the topic has been fully propagated to the broker the client is requesting metadata from, the topic will seem to be non-existent and the client will mark the topic as such, failing queued produced messages with `ERR__UNKNOWN_TOPIC`. This setting delays marking a topic as non-existent until the configured propagation max time has passed. The maximum propagation time is calculated from the time the topic is first referenced in the client, e.g., on produce(). <br>*Type: integer*
topic.blacklist | * | | | low | Topic blacklist, a comma-separated list of regular expressions for matching topic names that should be ignored in broker metadata information as if the topics did not exist. <br>*Type: pattern list*
debug | * | generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, mock, assignor, all | | medium | A comma-separated list of debug contexts to enable. Detailed Producer debugging: broker,topic,msg. Consumer: consumer,cgrp,topic,fetch <br>*Type: CSV flags*
debug | * | generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, mock, assignor, conf, all | | medium | A comma-separated list of debug contexts to enable. Detailed Producer debugging: broker,topic,msg. Consumer: consumer,cgrp,topic,fetch <br>*Type: CSV flags*
socket.timeout.ms | * | 10 .. 300000 | 60000 | low | Default timeout for network requests. Producer: ProduceRequests will use the lesser value of `socket.timeout.ms` and remaining `message.timeout.ms` for the first message in the batch. Consumer: FetchRequests will use `fetch.wait.max.ms` + `socket.timeout.ms`. Admin: Admin requests will use `socket.timeout.ms` or explicitly set `rd_kafka_AdminOptions_set_operation_timeout()` value. <br>*Type: integer*
socket.blocking.max.ms | * | 1 .. 60000 | 1000 | low | **DEPRECATED** No longer used. <br>*Type: integer*
socket.send.buffer.bytes | * | 0 .. 100000000 | 0 | low | Broker socket send buffer size. System default is used if 0. <br>*Type: integer*
Expand Down
1 change: 1 addition & 0 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,7 @@ admin | admin | Admin API debugging.
eos | producer | Idempotent Producer debugging.
mock | * | Mock cluster functionality debugging.
assignor | consumer | Detailed consumer group partition assignor debugging.
conf | * | Display set configuration properties on startup.
all | * | All of the above.


Expand Down
12 changes: 12 additions & 0 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -2460,6 +2460,18 @@ rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf,
/* Log warnings for deprecated configuration */
rd_kafka_conf_warn(rk);

/* Debug dump configuration */
if (rk->rk_conf.debug & RD_KAFKA_DBG_CONF) {
rd_kafka_anyconf_dump_dbg(rk, _RK_GLOBAL,
&rk->rk_conf,
"Client configuration");
if (rk->rk_conf.topic_conf)
rd_kafka_anyconf_dump_dbg(
rk, _RK_TOPIC,
rk->rk_conf.topic_conf,
"Default topic configuration");
}

/* Free user supplied conf's base pointer on success,
* but not the actual allocated fields since the struct
* will have been copied in its entirety above. */
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ const char *rd_kafka_get_debug_contexts(void);
* Use rd_kafka_get_debug_contexts() instead.
*/
#define RD_KAFKA_DEBUG_CONTEXTS \
"all,generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos,mock,assignor"
"all,generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos,mock,assignor,conf"


/* @cond NO_DOC */
Expand Down
88 changes: 67 additions & 21 deletions src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
{ RD_KAFKA_DBG_EOS, "eos" },
{ RD_KAFKA_DBG_MOCK, "mock" },
{ RD_KAFKA_DBG_ASSIGNOR, "assignor" },
{ RD_KAFKA_DBG_CONF, "conf" },
{ RD_KAFKA_DBG_ALL, "all" }
} },
{ _RK_GLOBAL, "socket.timeout.ms", _RK_C_INT, _RK(socket_timeout_ms),
Expand Down Expand Up @@ -724,7 +725,7 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"`SSL_CTX_set1_sigalgs_list(3)`. OpenSSL >= 1.0.2 required.",
_UNSUPPORTED_OPENSSL_1_0_2
},
{ _RK_GLOBAL, "ssl.key.location", _RK_C_STR,
{ _RK_GLOBAL|_RK_SENSITIVE, "ssl.key.location", _RK_C_STR,
_RK(ssl.key_location),
"Path to client's private key (PEM) used for authentication.",
_UNSUPPORTED_SSL
Expand All @@ -740,7 +741,7 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"Client's private key string (PEM format) used for authentication.",
_UNSUPPORTED_SSL
},
{ _RK_GLOBAL, "ssl_key", _RK_C_INTERNAL,
{ _RK_GLOBAL|_RK_SENSITIVE, "ssl_key", _RK_C_INTERNAL,
_RK(ssl.key),
"Client's private key as set by rd_kafka_conf_set_ssl_cert()",
.dtor = rd_kafka_conf_cert_dtor,
Expand Down Expand Up @@ -911,13 +912,13 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
0, 86400*1000, 60*1000,
_UNSUPPORTED_WIN32_GSSAPI
},
{ _RK_GLOBAL|_RK_HIGH, "sasl.username", _RK_C_STR,
_RK(sasl.username),
"SASL username for use with the PLAIN and SASL-SCRAM-.. mechanisms" },
{ _RK_GLOBAL|_RK_HIGH, "sasl.password", _RK_C_STR,
_RK(sasl.password),
"SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism" },
{ _RK_GLOBAL, "sasl.oauthbearer.config", _RK_C_STR,
{ _RK_GLOBAL|_RK_HIGH|_RK_SENSITIVE, "sasl.username", _RK_C_STR,
_RK(sasl.username),
"SASL username for use with the PLAIN and SASL-SCRAM-.. mechanisms" },
{ _RK_GLOBAL|_RK_HIGH|_RK_SENSITIVE, "sasl.password", _RK_C_STR,
_RK(sasl.password),
"SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism" },
{ _RK_GLOBAL|_RK_SENSITIVE, "sasl.oauthbearer.config", _RK_C_STR,
_RK(sasl.oauthbearer_config),
"SASL/OAUTHBEARER configuration. The format is "
"implementation-dependent and must be parsed accordingly. The "
Expand Down Expand Up @@ -959,8 +960,10 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
{ _RK_GLOBAL, "plugin.library.paths", _RK_C_STR,
_RK(plugin_paths),
"List of plugin libraries to load (; separated). "
"The library search path is platform dependent (see dlopen(3) for Unix and LoadLibrary() for Windows). If no filename extension is specified the "
"platform-specific extension (such as .dll or .so) will be appended automatically.",
"The library search path is platform dependent (see dlopen(3) for "
"Unix and LoadLibrary() for Windows). If no filename extension is "
"specified the platform-specific extension (such as .dll or .so) "
"will be appended automatically.",
#if WITH_PLUGINS
.set = rd_kafka_plugins_conf_set
#else
Expand Down Expand Up @@ -2345,6 +2348,13 @@ rd_kafka_anyconf_prop_desensitize (int scope, void *conf,
break;
}

case _RK_C_INTERNAL:
/* This is typically a pointer to something, the
* _RK_SENSITIVE flag is set to get it redacted in
* ..dump_dbg(), but we don't have to desensitize
* anything here. */
break;

default:
rd_assert(!*"BUG: Don't know how to desensitize prop type");
break;
Expand Down Expand Up @@ -3068,7 +3078,9 @@ rd_kafka_conf_res_t rd_kafka_conf_get (const rd_kafka_conf_t *conf,


static const char **rd_kafka_anyconf_dump (int scope, const void *conf,
size_t *cntp) {
size_t *cntp,
rd_bool_t only_modified,
rd_bool_t redact_sensitive) {
const struct rd_kafka_property *prop;
char **arr;
int cnt = 0;
Expand All @@ -3082,19 +3094,27 @@ static const char **rd_kafka_anyconf_dump (int scope, const void *conf,
if (!(prop->scope & scope))
continue;

if (only_modified && !rd_kafka_anyconf_is_modified(conf, prop))
continue;

/* Skip aliases, show original property instead.
* Skip invalids. */
if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID)
continue;

/* Query value size */
if (rd_kafka_anyconf_get0(conf, prop, NULL, &val_size) !=
RD_KAFKA_CONF_OK)
continue;
if (redact_sensitive && (prop->scope & _RK_SENSITIVE)) {
val = rd_strdup("[redacted]");
} else {
/* Query value size */
if (rd_kafka_anyconf_get0(conf, prop, NULL,
&val_size) !=
RD_KAFKA_CONF_OK)
continue;

/* Get value */
val = malloc(val_size);
rd_kafka_anyconf_get0(conf, prop, val, &val_size);
/* Get value */
val = rd_malloc(val_size);
rd_kafka_anyconf_get0(conf, prop, val, &val_size);
}

arr[cnt++] = rd_strdup(prop->name);
arr[cnt++] = val;
Expand All @@ -3107,12 +3127,16 @@ static const char **rd_kafka_anyconf_dump (int scope, const void *conf,


const char **rd_kafka_conf_dump (rd_kafka_conf_t *conf, size_t *cntp) {
return rd_kafka_anyconf_dump(_RK_GLOBAL, conf, cntp);
return rd_kafka_anyconf_dump(_RK_GLOBAL, conf, cntp,
rd_false/*all*/,
rd_false/*don't redact*/);
}

const char **rd_kafka_topic_conf_dump (rd_kafka_topic_conf_t *conf,
size_t *cntp) {
return rd_kafka_anyconf_dump(_RK_TOPIC, conf, cntp);
return rd_kafka_anyconf_dump(_RK_TOPIC, conf, cntp,
rd_false/*all*/,
rd_false/*don't redact*/);
}

void rd_kafka_conf_dump_free (const char **arr, size_t cnt) {
Expand All @@ -3126,6 +3150,28 @@ void rd_kafka_conf_dump_free (const char **arr, size_t cnt) {
rd_free(_arr);
}



/**
* @brief Dump configured properties to debug log.
*/
void rd_kafka_anyconf_dump_dbg (rd_kafka_t *rk, int scope, const void *conf,
const char *description) {
const char **arr;
size_t cnt;
size_t i;

arr = rd_kafka_anyconf_dump(scope, conf, &cnt,
rd_true/*modified only*/,
rd_true/*redact sensitive*/);
if (cnt > 0)
rd_kafka_dbg(rk, CONF, "CONF", "%s:", description);
for (i = 0 ; i < cnt ; i += 2)
rd_kafka_dbg(rk, CONF, "CONF", " %s = %s", arr[i], arr[i+1]);

rd_kafka_conf_dump_free(arr, cnt);
}

void rd_kafka_conf_properties_show (FILE *fp) {
const struct rd_kafka_property *prop0;
int last = 0;
Expand Down
2 changes: 2 additions & 0 deletions src/rdkafka_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,8 @@ const char *rd_kafka_topic_conf_finalize (rd_kafka_type_t cltype,

int rd_kafka_conf_warn (rd_kafka_t *rk);

void rd_kafka_anyconf_dump_dbg (rd_kafka_t *rk, int scope, const void *conf,
const char *description);

#include "rdkafka_confval.h"

Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,7 @@ const char *rd_kafka_purge_flags2str (int flags);
#define RD_KAFKA_DBG_EOS 0x8000
#define RD_KAFKA_DBG_MOCK 0x10000
#define RD_KAFKA_DBG_ASSIGNOR 0x20000
#define RD_KAFKA_DBG_CONF 0x40000
#define RD_KAFKA_DBG_ALL 0xfffff
#define RD_KAFKA_DBG_NONE 0x0

Expand Down
18 changes: 16 additions & 2 deletions src/rdkafka_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ rd_kafka_topic_t *rd_kafka_topic_new0 (rd_kafka_t *rk,
rd_kafka_topic_t *rkt;
const struct rd_kafka_metadata_cache_entry *rkmce;
const char *conf_err;
const char *used_conf_str;

/* Verify configuration.
* Maximum topic name size + headers must never exceed message.max.bytes
Expand All @@ -292,10 +293,15 @@ rd_kafka_topic_t *rd_kafka_topic_new0 (rd_kafka_t *rk,
}

if (!conf) {
if (rk->rk_conf.topic_conf)
if (rk->rk_conf.topic_conf) {
conf = rd_kafka_topic_conf_dup(rk->rk_conf.topic_conf);
else
used_conf_str = "default_topic_conf";
} else {
conf = rd_kafka_topic_conf_new();
used_conf_str = "empty";
}
} else {
used_conf_str = "user-supplied";
}


Expand Down Expand Up @@ -474,6 +480,14 @@ rd_kafka_topic_t *rd_kafka_topic_new0 (rd_kafka_t *rk,
if (do_lock)
rd_kafka_wrunlock(rk);

if (rk->rk_conf.debug & RD_KAFKA_DBG_CONF) {
char desc[256];
rd_snprintf(desc, sizeof(desc),
"Topic \"%s\" configuration (%s)",
topic, used_conf_str);
rd_kafka_anyconf_dump_dbg(rk, _RK_TOPIC, &rkt->rkt_conf, desc);
}

return rkt;
}

Expand Down

0 comments on commit c40941d

Please sign in to comment.