Skip to content

Commit

Permalink
Merge pull request mattweber#8 from Ralnoc/metric_prefix_change
Browse files Browse the repository at this point in the history
Removed args.prefix from the string normalization process.
  • Loading branch information
mattweber committed Jun 18, 2015
2 parents a383803 + ee1d9d5 commit 7f062f5
Showing 1 changed file with 25 additions and 19 deletions.
44 changes: 25 additions & 19 deletions es2graphite.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,24 @@ def normalize(what):
else:
return '%s.%s' % (normalize(what[0]), normalize(what[1:]))

def add_metric(metrics, prefix, stat, val, timestamp):
def add_metric(metrics, prefix, metric_path, stat, val, timestamp):
if isinstance(val, bool):
val = int(val)

if prefix[-1] == 'translog' and stat == 'id':
return
elif isinstance(val, (int, long, float)) and stat != 'timestamp':
metrics.append((normalize((prefix, stat)), (timestamp, val)))
#metrics.append((prefix + '.' + normalize((stat)), (timestamp, val)))
logging.debug(prefix + '.' + normalize((metric_path, stat)))
metrics.append((prefix + '.' + normalize((metric_path, stat)), (timestamp, val)))
elif stat == 'status' and val in STATUS:
metrics.append((normalize((prefix, stat)), (timestamp, STATUS[val])))
#metrics.append((prefix + '.' + normalize((stat)), (timestamp, STATUS[val])))
logging.debug(prefix + '.' + normalize((metric_path, stat)))
metrics.append((prefix + '.' + normalize((metric_path, stat)), (timestamp, STATUS[val])))
elif stat == 'state' and val in SHARD_STATE:
metrics.append((normalize((prefix, stat)), (timestamp, SHARD_STATE[val])))
#metrics.append((prefix + '.' + normalize((stat)), (timestamp, SHARD_STATE[val])))
logging.debug(prefix + '.' + normalize((metric_path, stat)))
metrics.append((prefix + '.' + normalize((metric_path, stat)), (timestamp, SHARD_STATE[val])))

def process_node_stats(prefix, stats):
metrics = []
Expand All @@ -64,60 +70,60 @@ def process_node_stats(prefix, stats):
for node_id in stats['nodes']:
node_stats = stats['nodes'][node_id]
NODES[node_id] = node_stats['name']
process_section(int(time.time()), metrics, (prefix, CLUSTER_NAME, NODES[node_id]), node_stats)
process_section(int(time.time()), metrics, prefix, (CLUSTER_NAME, NODES[node_id]), node_stats)
return metrics

def process_cluster_health(prefix, health):
metrics = []
process_section(int(time.time()), metrics, (prefix, CLUSTER_NAME), health)
process_section(int(time.time()), metrics, prefix, (CLUSTER_NAME), health)
return metrics

def process_indices_status(prefix, status):
metrics = []
process_section(int(time.time()), metrics, (prefix, CLUSTER_NAME, 'indices'), status['indices'])
process_section(int(time.time()), metrics, prefix, (CLUSTER_NAME, 'indices'), status['indices'])
return metrics

def process_indices_stats(prefix, stats):
metrics = []
process_section(int(time.time()), metrics, (prefix, CLUSTER_NAME, 'indices', '_all'), stats['_all'])
process_section(int(time.time()), metrics, prefix, (CLUSTER_NAME, 'indices', '_all'), stats['_all'])
if args.health_level != 'cluster':
process_section(int(time.time()), metrics, (prefix, CLUSTER_NAME, 'indices'), stats['indices'])
return metrics

def process_segments_status(prefix, status):
metrics = []
process_section(int(time.time()), metrics, (prefix, CLUSTER_NAME, 'indices'), status['indices'])
process_section(int(time.time()), metrics, prefix, (CLUSTER_NAME, 'indices'), status['indices'])
return metrics

def process_section(timestamp, metrics, prefix, section):
def process_section(timestamp, metrics, prefix, metric_path, section):
for stat in section:
stat_val = section[stat]
if 'timestamp' in section:
timestamp = int(section['timestamp'] / 1000) # es has epoch in ms, graphite needs seconds

if isinstance(stat_val, dict):
process_section(timestamp, metrics, (prefix, stat), stat_val)
process_section(timestamp, metrics, prefix, (metric_path, stat), stat_val)
elif isinstance(stat_val, list):
if prefix[-1] == 'fs' and stat == 'data':
for disk in stat_val:
mount = disk['mount']
process_section(timestamp, metrics, (prefix, stat, mount), disk)
process_section(timestamp, metrics, prefix, (metric_path, stat, mount), disk)
elif prefix[-1] == 'os' and stat == 'load_average':
add_metric(metrics, prefix, (stat, '1min_avg'), stat_val[0], timestamp)
add_metric(metrics, prefix, (stat, '5min_avg'), stat_val[1], timestamp)
add_metric(metrics, prefix, (stat, '15min_avg'), stat_val[2], timestamp)
add_metric(metrics, prefix, metric_path, (stat, '1min_avg'), stat_val[0], timestamp)
add_metric(metrics, prefix, metric_path, (stat, '5min_avg'), stat_val[1], timestamp)
add_metric(metrics, prefix, metric_path, (stat, '15min_avg'), stat_val[2], timestamp)
elif prefix[-1] == 'shards' and re.match('\d+', stat) is not None:
for shard in stat_val:
shard_node = NODES[shard['routing']['node']]
process_section(timestamp, metrics, (prefix, stat, shard_node), shard)
process_section(timestamp, metrics, prefix, (metric_path, stat, shard_node), shard)
else:
for stat_idx, sub_stat_val in enumerate(stat_val):
if isinstance(sub_stat_val, dict):
process_section(timestamp, metrics, (prefix, stat, str(stat_idx)), sub_stat_val)
process_section(timestamp, metrics, prefix, (metric_path, stat, str(stat_idx)), sub_stat_val)
else:
add_metric(metrics, prefix, (stat, str(stat_idx)), sub_stat_val, timestamp)
add_metric(metrics, prefix, metric_path, (stat, str(stat_idx)), sub_stat_val, timestamp)
else:
add_metric(metrics, prefix, stat, stat_val, timestamp)
add_metric(metrics, prefix, metric_path, stat, stat_val, timestamp)

def submit_to_graphite(metrics):
graphite_socket = {'socket': socket.socket( socket.AF_INET, socket.SOCK_STREAM ),
Expand Down

0 comments on commit 7f062f5

Please sign in to comment.