forked from Netflix/dynomite
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdyn_stats.h
425 lines (346 loc) · 21.1 KB
/
dyn_stats.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
/*
* Dynomite - A thin, distributed replication layer for multi non-distributed storages.
* Copyright (C) 2014 Netflix, Inc.
*/
/*
* twemproxy - A fast and lightweight proxy for memcached protocol.
* Copyright (C) 2011 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*_stats_pool_set_ts
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "dyn_core.h"
#include "dyn_histogram.h"
#ifndef _DYN_STATS_H_
#define _DYN_STATS_H_
#define STATS_POOL_CODEC(ACTION) \
/* client behavior */ \
ACTION( client_eof, STATS_COUNTER, "# eof on client connections") \
ACTION( client_err, STATS_COUNTER, "# errors on client connections") \
ACTION( client_connections, STATS_GAUGE, "# active client connections") \
ACTION( client_read_requests, STATS_COUNTER, "# client read requests") \
ACTION( client_write_requests, STATS_COUNTER, "# client write responses") \
ACTION( client_dropped_requests, STATS_COUNTER, "# client dropped requests") \
ACTION( client_non_quorum_w_responses,STATS_COUNTER, "# client non quorum write responses") \
ACTION( client_non_quorum_r_responses,STATS_COUNTER, "# client non quorum read responses") \
/* pool behavior */ \
ACTION( server_ejects, STATS_COUNTER, "# times backend server was ejected") \
/* dnode client behavior */ \
ACTION( dnode_client_eof, STATS_COUNTER, "# eof on dnode client connections") \
ACTION( dnode_client_err, STATS_COUNTER, "# errors on dnode client connections") \
ACTION( dnode_client_connections, STATS_GAUGE, "# active dnode client connections") \
ACTION( dnode_client_in_queue, STATS_GAUGE, "# dnode client requests in incoming queue") \
ACTION( dnode_client_in_queue_bytes, STATS_GAUGE, "current dnode client request bytes in incoming queue") \
ACTION( dnode_client_out_queue, STATS_GAUGE, "# dnode client requests in outgoing queue") \
ACTION( dnode_client_out_queue_bytes, STATS_GAUGE, "current dnode client request bytes in outgoing queue") \
/* peer behavior */ \
ACTION( peer_dropped_requests, STATS_COUNTER, "# local dc peer dropped requests") \
ACTION( peer_timedout_requests, STATS_COUNTER, "# local dc peer timedout requests") \
ACTION( remote_peer_timedout_requests,STATS_COUNTER, "# remote dc peer timedout requests") \
ACTION( peer_eof, STATS_COUNTER, "# eof on peer connections") \
ACTION( peer_err, STATS_COUNTER, "# errors on peer connections") \
ACTION( peer_timedout, STATS_COUNTER, "# timeouts on local dc peer connections") \
ACTION( remote_peer_timedout, STATS_COUNTER, "# timeouts on remote dc peer connections") \
ACTION( peer_connections, STATS_GAUGE, "# active peer connections") \
ACTION( peer_forward_error, STATS_GAUGE, "# times we encountered a peer forwarding error") \
ACTION( peer_requests, STATS_COUNTER, "# peer requests") \
ACTION( peer_request_bytes, STATS_COUNTER, "total peer request bytes") \
ACTION( peer_responses, STATS_COUNTER, "# peer respones") \
ACTION( peer_response_bytes, STATS_COUNTER, "total peer response bytes") \
ACTION( peer_ejects, STATS_COUNTER, "# times a peer was ejected") \
ACTION( peer_in_queue, STATS_GAUGE, "# local dc peer requests in incoming queue") \
ACTION( remote_peer_in_queue, STATS_GAUGE, "# remote dc peer requests in incoming queue") \
ACTION( peer_in_queue_bytes, STATS_GAUGE, "current peer request bytes in incoming queue") \
ACTION( remote_peer_in_queue_bytes, STATS_GAUGE, "current peer request bytes in incoming queue to remote DC") \
ACTION( peer_out_queue, STATS_GAUGE, "# local dc peer requests in outgoing queue") \
ACTION( remote_peer_out_queue, STATS_GAUGE, "# remote dc peer requests in outgoing queue") \
ACTION( peer_out_queue_bytes, STATS_GAUGE, "current peer request bytes in outgoing queue") \
ACTION( remote_peer_out_queue_bytes, STATS_GAUGE, "current peer request bytes in outgoing queue to remote DC") \
ACTION( peer_mismatch_requests, STATS_COUNTER, "current dnode peer mismatched messages") \
/* forwarder behavior */ \
ACTION( forward_error, STATS_COUNTER, "# times we encountered a forwarding error") \
ACTION( fragments, STATS_COUNTER, "# fragments created from a multi-vector request") \
ACTION( stats_count, STATS_COUNTER, "# stats request") \
#define STATS_SERVER_CODEC(ACTION) \
/* server behavior */ \
ACTION( server_eof, STATS_COUNTER, "# eof on server connections") \
ACTION( server_err, STATS_COUNTER, "# errors on server connections") \
ACTION( server_timedout, STATS_COUNTER, "# timeouts on server connections") \
ACTION( server_connections, STATS_GAUGE, "# active server connections") \
ACTION( server_ejected_at, STATS_TIMESTAMP, "timestamp when server was ejected in usec since epoch") \
ACTION( server_dropped_requests, STATS_COUNTER, "# server dropped requests") \
ACTION( server_timedout_requests, STATS_COUNTER, "# server timedout requests") \
/* data behavior */ \
ACTION( read_requests, STATS_COUNTER, "# read requests") \
ACTION( read_request_bytes, STATS_COUNTER, "total read request bytes") \
ACTION( write_requests, STATS_COUNTER, "# write requests") \
ACTION( write_request_bytes, STATS_COUNTER, "total write request bytes") \
ACTION( read_responses, STATS_COUNTER, "# read respones") \
ACTION( read_response_bytes, STATS_COUNTER, "total read response bytes") \
ACTION( write_responses, STATS_COUNTER, "# write respones") \
ACTION( write_response_bytes, STATS_COUNTER, "total write response bytes") \
ACTION( in_queue, STATS_GAUGE, "# requests in incoming queue") \
ACTION( in_queue_bytes, STATS_GAUGE, "current request bytes in incoming queue") \
ACTION( out_queue, STATS_GAUGE, "# requests in outgoing queue") \
ACTION( out_queue_bytes, STATS_GAUGE, "current request bytes in outgoing queue") \
/* Redis */ \
ACTION( redis_req_get, STATS_COUNTER, "# Redis get") \
ACTION( redis_req_set, STATS_COUNTER, "# Redis set") \
ACTION( redis_req_del, STATS_COUNTER, "# Redis del") \
ACTION( redis_req_incr_decr, STATS_COUNTER, "# Redis incr or decr") \
ACTION( redis_req_keys, STATS_COUNTER, "# Redis keys") \
ACTION( redis_req_mget, STATS_COUNTER, "# Redis mget") \
ACTION( redis_req_scan, STATS_COUNTER, "# Redis scan") \
ACTION( redis_req_sort, STATS_COUNTER, "# Redis sort") \
ACTION( redis_req_lreqm, STATS_COUNTER, "# Redis lreqm") \
ACTION( redis_req_sunion, STATS_COUNTER, "# Redis sunion") \
ACTION( redis_req_ping, STATS_COUNTER, "# Redis ping") \
ACTION( redis_req_lists, STATS_COUNTER, "# Redis lists") \
ACTION( redis_req_sets, STATS_COUNTER, "# Redis sets") \
ACTION( redis_req_hashes, STATS_COUNTER, "# Redis hashes") \
ACTION( redis_req_sortedsets, STATS_COUNTER, "# Redis sortedsets") \
ACTION( redis_req_other, STATS_COUNTER, "# Redis other") \
#define STATS_ADDR "0.0.0.0"
#define STATS_PORT 22222
#define STATS_INTERVAL (30 * 1000) /* in msec */
typedef enum stats_type {
STATS_INVALID,
STATS_COUNTER, /* monotonic accumulator */
STATS_GAUGE, /* non-monotonic accumulator */
STATS_TIMESTAMP, /* monotonic timestamp (in nsec) */
STATS_STRING,
STATS_SENTINEL
} stats_type_t;
typedef enum {
CMD_UNKNOWN,
CMD_HELP,
CMD_INFO,
CMD_PING,
CMD_DESCRIBE,
CMD_STANDBY,
CMD_WRITES_ONLY,
CMD_RESUMING,
CMD_NORMAL,
CMD_BOOTSTRAPING,
CMD_LEAVING,
CMD_PEER_DOWN,
CMD_PEER_UP,
CMD_PEER_RESET,
CMD_LOG_LEVEL_UP,
CMD_LOG_LEVEL_DOWN,
CMD_HISTO_RESET,
CMD_CL_DESCRIBE, /* cluster_describe */
CMD_SET_CONSISTENCY,
CMD_GET_CONSISTENCY,
CMD_GET_TIMEOUT_FACTOR,
CMD_SET_TIMEOUT_FACTOR,
} stats_cmd_t;
struct stats_metric {
stats_type_t type; /* type */
struct string name; /* name (ref) */
union {
int64_t counter; /* accumulating counter */
int64_t timestamp; /* monotonic timestamp */
struct string str; /* store string value */
} value;
};
struct stats_dnode {
struct string name; /* dnode server name (ref) */
struct array metric; /* stats_metric[] for dnode server codec */
};
struct stats_server {
struct string name; /* server name (ref) */
struct array metric; /* stats_metric[] for server codec */
};
struct stats_pool {
struct string name; /* pool name (ref) */
struct array metric; /* stats_metric[] for pool codec */
struct stats_server server; /* stats for datastore */
};
struct stats_buffer {
size_t len; /* buffer length */
uint8_t *data; /* buffer data */
size_t size; /* buffer alloc size */
};
struct stats {
struct context *ctx;
uint16_t port; /* stats monitoring port */
int interval; /* stats aggregation interval */
struct string addr; /* stats monitoring address */
int64_t start_ts; /* start timestamp of dynomite */
struct stats_buffer buf; /* info buffer */
struct stats_buffer clus_desc_buf; /* cluster_describe buffer */
struct stats_pool current; /* stats_pool[] (a) */
struct stats_pool shadow; /* stats_pool[] (b) */
struct stats_pool sum; /* stats_pool[] (c = a + b) */
pthread_t tid; /* stats aggregator thread */
int sd; /* stats descriptor */
struct string service_str; /* service string */
struct string service; /* service */
struct string source_str; /* source string */
struct string source; /* source */
struct string version_str; /* version string */
struct string version; /* version */
struct string uptime_str; /* uptime string */
struct string timestamp_str; /* timestamp string */
struct string latency_999th_str;
struct string latency_99th_str;
struct string latency_95th_str;
struct string latency_mean_str;
struct string latency_max_str;
struct string payload_size_999th_str;
struct string payload_size_99th_str;
struct string payload_size_95th_str;
struct string payload_size_mean_str;
struct string payload_size_max_str;
struct string cross_region_avg_rtt;
struct string cross_region_99_rtt;
struct string client_out_queue_99;
struct string server_in_queue_99;
struct string server_out_queue_99;
struct string dnode_client_out_queue_99;
struct string peer_in_queue_99;
struct string peer_out_queue_99;
struct string remote_peer_in_queue_99;
struct string remote_peer_out_queue_99;
struct string alloc_msgs_str;
struct string free_msgs_str;
struct string alloc_mbufs_str;
struct string free_mbufs_str;
struct string dyn_memory_str;
struct string rack_str;
struct string rack;
struct string dc_str;
struct string dc;
volatile int aggregate; /* shadow (b) aggregate? */
volatile int updated; /* current (a) updated? */
volatile bool reset_histogram;
struct histogram latency_histo;
struct histogram payload_size_histo;
struct histogram cross_region_histo;
struct histogram client_out_queue;
struct histogram server_in_queue;
struct histogram server_out_queue;
struct histogram dnode_client_out_queue;
struct histogram peer_in_queue;
struct histogram peer_out_queue;
struct histogram remote_peer_in_queue;
struct histogram remote_peer_out_queue;
size_t alloc_msgs;
size_t free_msgs;
uint64_t alloc_mbufs;
uint64_t free_mbufs;
uint64_t dyn_memory;
};
#define DEFINE_ACTION(_name, _type, _desc) STATS_POOL_##_name,
typedef enum stats_pool_field {
STATS_POOL_CODEC(DEFINE_ACTION)
STATS_POOL_NFIELD
} stats_pool_field_t;
#undef DEFINE_ACTION
#define DEFINE_ACTION(_name, _type, _desc) STATS_SERVER_##_name,
typedef enum stats_server_field {
STATS_SERVER_CODEC(DEFINE_ACTION)
STATS_SERVER_NFIELD
} stats_server_field_t;
#undef DEFINE_ACTION
struct stats_cmd {
stats_cmd_t cmd;
struct string req_data;
};
#if defined DN_STATS && DN_STATS == 1
#define stats_pool_incr(_ctx, _name) do { \
_stats_pool_incr(_ctx, STATS_POOL_##_name); \
} while (0)
#define stats_pool_decr(_ctx, _name) do { \
_stats_pool_decr(_ctx, STATS_POOL_##_name); \
} while (0)
#define stats_pool_incr_by(_ctx, _name, _val) do { \
_stats_pool_incr_by(_ctx, STATS_POOL_##_name, _val); \
} while (0)
#define stats_pool_decr_by(_ctx, _name, _val) do { \
_stats_pool_decr_by(_ctx, STATS_POOL_##_name, _val); \
} while (0)
#define stats_pool_set_ts(_ctx, _name, _val) do { \
_stats_pool_set_ts(_ctx, STATS_POOL_##_name, _val); \
} while (0)
#define stats_pool_get_ts(_ctx, _name) \
_stats_pool_get_ts(_ctx, STATS_POOL_##_name)
#define stats_pool_set_val(_ctx, _name, _val) do { \
_stats_pool_set_val(_ctx, STATS_POOL_##_name, _val); \
} while (0)
#define stats_pool_get_val(_ctx, _name) \
_stats_pool_get_val(_ctx, STATS_POOL_##_name)
#define stats_server_incr(_ctx, _name) do { \
_stats_server_incr(_ctx, STATS_SERVER_##_name); \
} while (0)
#define stats_server_decr(_ctx, _name) do { \
_stats_server_decr(_ctx, STATS_SERVER_##_name); \
} while (0)
#define stats_server_incr_by(_ctx, _name, _val) do { \
_stats_server_incr_by(_ctx, STATS_SERVER_##_name, _val); \
} while (0)
#define stats_server_decr_by(_ctx, _name, _val) do { \
_stats_server_decr_by(_ctx, STATS_SERVER_##_name, _val); \
} while (0)
#define stats_server_set_ts(_ctx, _name, _val) do { \
_stats_server_set_ts(_ctx, STATS_SERVER_##_name, _val); \
} while (0)
#define stats_server_get_ts(_ctx, _name) \
_stats_server_get_ts(_ctx, STATS_SERVER_##_name)
#define stats_server_set_val(_ctx, _name, _val) do { \
_stats_server_set_val(_ctx, STATS_SERVER_##_name, _val); \
} while (0)
#define stats_server_get_val(_ctx, _name) \
_stats_server_get_val(_ctx, STATS_SERVER_##_name)
#else
#define stats_pool_incr(_ctx, _name)
#define stats_pool_decr(_ctx, _name)
#define stats_pool_incr_by(_ctx, _name, _val)
#define stats_pool_decr_by(_ctx, _name, _val)
#define stats_pool_set_val(_ctx, _name, _val)
#define stats_pool_get_val(_ctx, _name)
#define stats_server_incr(_ctx, _name)
#define stats_server_decr(_ctx, _name)
#define stats_server_incr_by(_ctx, _name, _val)
#define stats_server_decr_by(_ctx, _name, _val)
#define stats_server_set_ts(_ctx, _name, _val)
#define stats_server_get_ts(_ctx, _name)
#define stats_server_set_val(_ctx, _name, _val)
#define stats_server_get_val(_ctx, _name)
#endif
#define stats_enabled DN_STATS
void stats_describe(void);
void _stats_pool_incr(struct context *ctx, stats_pool_field_t fidx);
void _stats_pool_decr(struct context *ctx, stats_pool_field_t fidx);
void _stats_pool_incr_by(struct context *ctx, stats_pool_field_t fidx, int64_t val);
void _stats_pool_decr_by(struct context *ctx, stats_pool_field_t fidx, int64_t val);
void _stats_pool_set_ts(struct context *ctx, stats_pool_field_t fidx, int64_t val);
uint64_t _stats_pool_get_ts(struct context *ctx,stats_pool_field_t fidx);
void _stats_pool_set_val(struct context *ctx,stats_pool_field_t fidx, int64_t val);
int64_t _stats_pool_get_val(struct context *ctx,
stats_pool_field_t fidx);
void _stats_server_incr(struct context *ctx, stats_server_field_t fidx);
void _stats_server_decr(struct context *ctx, stats_server_field_t fidx);
void _stats_server_incr_by(struct context *ctx, stats_server_field_t fidx, int64_t val);
void _stats_server_decr_by(struct context *ctx, stats_server_field_t fidx, int64_t val);
void _stats_server_set_ts(struct context *ctx, stats_server_field_t fidx, uint64_t val);
uint64_t _stats_server_get_ts(struct context *ctx, stats_server_field_t fidx);
void _stats_server_set_val(struct context *ctx, stats_server_field_t fidx, int64_t val);
int64_t _stats_server_get_val(struct context *ctx, stats_server_field_t fidx);
struct stats *stats_create(uint16_t stats_port, char *stats_ip, int stats_interval, char *source,
struct server_pool *sp, struct context *ctx);
void stats_destroy(struct stats *stats);
void stats_swap(struct stats *stats);
void stats_histo_add_latency(struct context *ctx, uint64_t val);
void stats_histo_add_payloadsize(struct context *ctx, uint64_t val);
#endif