Skip to content

Commit

Permalink
Enabled background and reply time tracking on blocked on keys/blocked…
Browse files Browse the repository at this point in the history
… on background work clients (redis#7491)

This commit enables tracking time of the background tasks and on replies,
opening the door for properly tracking commands that rely on blocking / background
 work via the slowlog, latency history, and commandstats. 

Some notes:
- The time spent blocked waiting for key changes, or blocked on synchronous
  replication is not accounted for. 

- **This commit does not affect latency tracking of commands that are non-blocking
  or do not have background work.** ( meaning that it all stays the same with exception to
  `BZPOPMIN`,`BZPOPMAX`,`BRPOP`,`BLPOP`, etc... and module's commands that rely
  on background threads ). 

-  Specifically for latency history command we've added a new event class named
  `command-unblocking` that will enable latency monitoring on commands that spawn
  background threads to do the work.

- For blocking commands we're now considering the total time of a command as the
  time spent on call() + the time spent on replying when unblocked.

- For Modules commands that rely on background threads we're now considering the
  total time of a command as the time spent on call (main thread) + the time spent on
  the background thread ( if marked within `RedisModule_MeasureTimeStart()` and
  `RedisModule_MeasureTimeEnd()` ) + the time spent on replying (main thread)

To test for this feature we've added a `unit/moduleapi/blockonbackground` test that relies on
a module that blocks the client and sleeps on the background for a given time. 
- check blocked command that uses RedisModule_MeasureTimeStart() is tracking background time
- check blocked command that uses RedisModule_MeasureTimeStart() is tracking background time even in timeout
- check blocked command with multiple calls RedisModule_MeasureTimeStart()  is tracking the total background time
- check blocked command without calling RedisModule_MeasureTimeStart() is not reporting background time
  • Loading branch information
filipecosta90 authored Jan 29, 2021
1 parent b9a0500 commit f0c5052
Show file tree
Hide file tree
Showing 9 changed files with 383 additions and 4 deletions.
1 change: 1 addition & 0 deletions runtest-moduleapi
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ $TCLSH tests/test_helper.tcl \
--single unit/moduleapi/hooks \
--single unit/moduleapi/misc \
--single unit/moduleapi/blockonkeys \
--single unit/moduleapi/blockonbackground \
--single unit/moduleapi/scan \
--single unit/moduleapi/datatype \
--single unit/moduleapi/auth \
Expand Down
29 changes: 29 additions & 0 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@
*/

#include "server.h"
#include "slowlog.h"
#include "latency.h"
#include "monotonic.h"

int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int wherefrom, int whereto);
int getListPositionFromObjectOrReply(client *c, robj *arg, int *position);
Expand Down Expand Up @@ -97,6 +100,20 @@ void blockClient(client *c, int btype) {
}
}

/* This function is called after a client has finished a blocking operation
* in order to update the total command duration, log the command into
* the Slow log if needed, and log the reply duration event if needed. */
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us){
const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us;
c->lastcmd->microseconds += total_cmd_duration;
/* Log the command into the Slow log if needed. */
if (!(c->lastcmd->flags & CMD_SKIP_SLOWLOG)) {
slowlogPushEntryIfNeeded(c,c->argv,c->argc,total_cmd_duration);
/* Log the reply duration event. */
latencyAddSampleIfNeeded("command-unblocking",reply_us/1000);
}
}

/* This function is called in the beforeSleep() function of the event loop
* in order to process the pending input buffer of clients that were
* unblocked after a blocking operation. */
Expand Down Expand Up @@ -264,6 +281,8 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
if (dstkey) incrRefCount(dstkey);
unblockClient(receiver);

monotime replyTimer;
elapsedStart(&replyTimer);
if (serveClientBlockedOnList(receiver,
rl->key,dstkey,rl->db,value,
wherefrom, whereto) == C_ERR)
Expand All @@ -272,6 +291,7 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
* to also undo the POP operation. */
listTypePush(o,value,wherefrom);
}
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));

if (dstkey) decrRefCount(dstkey);
decrRefCount(value);
Expand Down Expand Up @@ -316,7 +336,10 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
receiver->lastcmd->proc == bzpopminCommand)
? ZSET_MIN : ZSET_MAX;
unblockClient(receiver);
monotime replyTimer;
elapsedStart(&replyTimer);
genericZpopCommand(receiver,&rl->key,1,where,1,NULL);
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
zcard--;

/* Replicate the command. */
Expand Down Expand Up @@ -406,6 +429,8 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
}
}

monotime replyTimer;
elapsedStart(&replyTimer);
/* Emit the two elements sub-array consisting of
* the name of the stream and the data we
* extracted from it. Wrapped in a single-item
Expand All @@ -425,6 +450,7 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
streamReplyWithRange(receiver,s,&start,NULL,
receiver->bpop.xread_count,
0, group, consumer, noack, &pi);
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));

/* Note that after we unblock the client, 'gt'
* and other receiver->bpop stuff are no longer
Expand Down Expand Up @@ -471,7 +497,10 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) {
* different modules with different triggers to consider if a key
* is ready or not. This means we can't exit the loop but need
* to continue after the first failure. */
monotime replyTimer;
elapsedStart(&replyTimer);
if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));

moduleUnblockClient(receiver);
}
Expand Down
51 changes: 51 additions & 0 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@

#include "server.h"
#include "cluster.h"
#include "slowlog.h"
#include "rdb.h"
#include "monotonic.h"
#include <dlfcn.h>
#include <sys/stat.h>
#include <sys/wait.h>
Expand Down Expand Up @@ -262,6 +264,9 @@ typedef struct RedisModuleBlockedClient {
int dbid; /* Database number selected by the original client. */
int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */
int unblocked; /* Already on the moduleUnblocked list. */
monotime background_timer; /* Timer tracking the start of background work */
uint64_t background_duration; /* Current command background time duration.
Used for measuring latency of blocking cmds */
} RedisModuleBlockedClient;

static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
Expand Down Expand Up @@ -915,6 +920,30 @@ long long RM_Milliseconds(void) {
return mstime();
}

/* Mark a point in time that will be used as the start time to calculate
* the elapsed execution time when RM_BlockedClientMeasureTimeEnd() is called.
* Within the same command, you can call multiple times
* RM_BlockedClientMeasureTimeStart() and RM_BlockedClientMeasureTimeEnd()
* to accummulate indepedent time intervals to the background duration.
* This method always return REDISMODULE_OK. */
int RM_BlockedClientMeasureTimeStart(RedisModuleBlockedClient *bc) {
elapsedStart(&(bc->background_timer));
return REDISMODULE_OK;
}

/* Mark a point in time that will be used as the end time
* to calculate the elapsed execution time.
* On success REDISMODULE_OK is returned.
* This method only returns REDISMODULE_ERR if no start time was
* previously defined ( meaning RM_BlockedClientMeasureTimeStart was not called ). */
int RM_BlockedClientMeasureTimeEnd(RedisModuleBlockedClient *bc) {
// If the counter is 0 then we haven't called RM_BlockedClientMeasureTimeStart
if (!bc->background_timer)
return REDISMODULE_ERR;
bc->background_duration += elapsedUs(bc->background_timer);
return REDISMODULE_OK;
}

/* Set flags defining capabilities or behavior bit flags.
*
* REDISMODULE_OPTIONS_HANDLE_IO_ERRORS:
Expand Down Expand Up @@ -5091,6 +5120,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
bc->dbid = c->db->id;
bc->blocked_on_keys = keys != NULL;
bc->unblocked = 0;
bc->background_duration = 0;
c->bpop.timeout = timeout;

if (islua || ismulti) {
Expand Down Expand Up @@ -5164,6 +5194,11 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
*
* In these cases, a call to RedisModule_BlockClient() will **not** block the
* client, but instead produce a specific error reply.
*
* Measuring background time: By default the time spent in the blocked command
* is not account for the total command duration. To include such time you should
* use RM_BlockedClientMeasureTimeStart() and RM_BlockedClientMeasureTimeEnd() one,
* or multiple times within the blocking command background work.
*/
RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL);
Expand Down Expand Up @@ -5358,6 +5393,7 @@ void moduleHandleBlockedClients(void) {
* was blocked on keys (RM_BlockClientOnKeys()), because we already
* called such callback in moduleTryServeClientBlockedOnKey() when
* the key was signaled as ready. */
uint64_t reply_us = 0;
if (c && !bc->blocked_on_keys && bc->reply_callback) {
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
Expand All @@ -5366,9 +5402,19 @@ void moduleHandleBlockedClients(void) {
ctx.module = bc->module;
ctx.client = bc->client;
ctx.blocked_client = bc;
monotime replyTimer;
elapsedStart(&replyTimer);
bc->reply_callback(&ctx,(void**)c->argv,c->argc);
reply_us = elapsedUs(replyTimer);
moduleFreeContext(&ctx);
}
/* Update stats now that we've finished the blocking operation.
* This needs to be out of the reply callback above given that a
* module might not define any callback and still do blocking ops.
*/
if (c && !bc->blocked_on_keys) {
updateStatsOnUnblock(c, bc->background_duration, reply_us);
}

/* Free privdata if any. */
if (bc->privdata && bc->free_privdata) {
Expand Down Expand Up @@ -5432,6 +5478,9 @@ void moduleBlockedClientTimedOut(client *c) {
ctx.blocked_privdata = bc->privdata;
bc->timeout_callback(&ctx,(void**)c->argv,c->argc);
moduleFreeContext(&ctx);
if (!bc->blocked_on_keys) {
updateStatsOnUnblock(c, bc->background_duration, 0);
}
/* For timeout events, we do not want to call the disconnect callback,
* because the blocked client will be automatically disconnected in
* this case, and the user can still hook using the timeout callback. */
Expand Down Expand Up @@ -9094,6 +9143,8 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(GetBlockedClientPrivateData);
REGISTER_API(AbortBlock);
REGISTER_API(Milliseconds);
REGISTER_API(BlockedClientMeasureTimeStart);
REGISTER_API(BlockedClientMeasureTimeEnd);
REGISTER_API(GetThreadSafeContext);
REGISTER_API(GetDetachedThreadSafeContext);
REGISTER_API(FreeThreadSafeContext);
Expand Down
4 changes: 4 additions & 0 deletions src/redismodule.h
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,8 @@ REDISMODULE_API int (*RedisModule_IsBlockedTimeoutRequest)(RedisModuleCtx *ctx)
REDISMODULE_API void * (*RedisModule_GetBlockedClientPrivateData)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_GetBlockedClientHandle)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_AbortBlock)(RedisModuleBlockedClient *bc) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_BlockedClientMeasureTimeStart)(RedisModuleBlockedClient *bc) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_BlockedClientMeasureTimeEnd)(RedisModuleBlockedClient *bc) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleCtx * (*RedisModule_GetThreadSafeContext)(RedisModuleBlockedClient *bc) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleCtx * (*RedisModule_GetDetachedThreadSafeContext)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_FreeThreadSafeContext)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
Expand Down Expand Up @@ -1049,6 +1051,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(GetBlockedClientPrivateData);
REDISMODULE_GET_API(GetBlockedClientHandle);
REDISMODULE_GET_API(AbortBlock);
REDISMODULE_GET_API(BlockedClientMeasureTimeStart);
REDISMODULE_GET_API(BlockedClientMeasureTimeEnd);
REDISMODULE_GET_API(SetDisconnectCallback);
REDISMODULE_GET_API(SubscribeToKeyspaceEvents);
REDISMODULE_GET_API(NotifyKeyspaceEvent);
Expand Down
12 changes: 8 additions & 4 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -3620,7 +3620,7 @@ void preventCommandReplication(client *c) {
*/
void call(client *c, int flags) {
long long dirty;
ustime_t start, duration;
monotime call_timer;
int client_old_flags = c->flags;
struct redisCommand *real_cmd = c->cmd;
static long long prev_err_count;
Expand All @@ -3646,9 +3646,10 @@ void call(client *c, int flags) {
dirty = server.dirty;
prev_err_count = server.stat_total_error_replies;
updateCachedTime(0);
start = server.ustime;
elapsedStart(&call_timer);
c->cmd->proc(c);
duration = ustime()-start;
const long duration = elapsedUs(call_timer);
c->duration = duration;
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;

Expand Down Expand Up @@ -3692,7 +3693,10 @@ void call(client *c, int flags) {
* arguments. */
robj **argv = c->original_argv ? c->original_argv : c->argv;
int argc = c->original_argv ? c->original_argc : c->argc;
slowlogPushEntryIfNeeded(c,argv,argc,duration);
/* If the client is blocked we will handle slowlog when it is unblocked . */
if (!(c->flags & CLIENT_BLOCKED)) {
slowlogPushEntryIfNeeded(c,argv,argc,duration);
}
}
freeClientOriginalArgv(c);

Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,7 @@ typedef struct client {
size_t sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
time_t ctime; /* Client creation time. */
long duration; /* Current command duration. Used for measuring latency of blocking/non-blocking cmds */
time_t lastinteraction; /* Time of the last interaction, used for timeout */
time_t obuf_soft_limit_reached_time;
uint64_t flags; /* Client flags: CLIENT_* macros. */
Expand Down Expand Up @@ -2405,6 +2406,7 @@ void disconnectAllBlockedClients(void);
void handleClientsBlockedOnKeys(void);
void signalKeyAsReady(redisDb *db, robj *key, int type);
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids);
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us);

/* timeout.c -- Blocked clients timeout and connections timeout. */
void addClientToTimeoutTable(client *c);
Expand Down
1 change: 1 addition & 0 deletions tests/modules/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ TEST_MODULES = \
misc.so \
hooks.so \
blockonkeys.so \
blockonbackground.so \
scan.so \
datatype.so \
auth.so \
Expand Down
Loading

0 comments on commit f0c5052

Please sign in to comment.