Skip to content

Commit

Permalink
Re-implement ClusterCheckerTask
Browse files Browse the repository at this point in the history
 - Inspect response body in case of success and failure
 - Treat empty response as a failuregit
 - Change metrics
  * Publish counter for each indicator, unless UP or ignored, tagged with target node name
  * Publish counter for each failure to talk to a node, tagged with target node name
  • Loading branch information
mprimi committed May 28, 2019
1 parent 2258f0c commit 4374fa1
Show file tree
Hide file tree
Showing 4 changed files with 385 additions and 407 deletions.
22 changes: 11 additions & 11 deletions genie-docs/src/docs/asciidoc/_metrics.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -343,29 +343,29 @@ published within the local JVM and available on the Actuator `/metrics` endpoint
|JobMonitoringCoordinator
|-
|genie.tasks.clusterChecker.errorCounts.gauge
|Number of Genie nodes that the current leader failed is presently failing to contact
|genie.tasks.clusterChecker.unhealtyHosts.gauge
|Number of Genie nodes that the current leader failed is considering unhealthy
|Current amount
|ClusterCheckerTask
|-
|genie.tasks.clusterChecker.health.counter
|Counts the number of time an remote node's health check indicator returned a given status
|genie.tasks.clusterChecker.unreachableHost.counter
|Counts the number of time the leader failed to get health status of a remote node
|count
|ClusterCheckerTask
|host, healthIndicator, healthStatus
|host
|genie.tasks.clusterChecker.lostJobs.rate
|Number of jobs marked as "lost" due to a consistent failure to contact the Genie node hosting them
|genie.tasks.clusterChecker.failedHealthcheck.counter
|Number of (non-ignored) healthcheck indicators returning a status different from UP
|count
|ClusterCheckerTask
|-
|host, healthIndicator, healthStatus
|genie.tasks.clusterChecker.unableToUpdateJob.rate
|Counts the number of time an exception was raised while trying to update the database status of a lost job
|genie.tasks.clusterChecker.jobsMarkedFailed.counter
|Number of jobs marked as "lost" due to a consistent failure to contact the Genie node hosting them
|count
|ClusterCheckerTask
|-
|status, host, exceptionClass
|genie.tasks.databaseCleanup.numDeletedClusters.gauge
|Number of terminated cluster records purged during the last database cleanup pass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package com.netflix.genie.web.tasks.leader;

import com.fasterxml.jackson.databind.type.TypeFactory;
import com.google.common.base.Splitter;
import com.netflix.genie.common.dto.Job;
import com.netflix.genie.common.dto.JobExecution;
Expand All @@ -31,16 +30,20 @@
import com.netflix.genie.web.tasks.GenieTaskScheduleType;
import com.netflix.genie.web.util.MetricsConstants;
import com.netflix.genie.web.util.MetricsUtils;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.actuate.autoconfigure.endpoint.web.WebEndpointProperties;
import org.springframework.boot.actuate.health.Status;
import org.springframework.web.client.HttpStatusCodeException;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;

import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -57,11 +60,10 @@
*/
@Slf4j
public class ClusterCheckerTask extends LeadershipTask {
private static final String PROPERTY_STATUS = "status";
private static final String ERROR_COUNTS_GAUGE_METRIC_NAME = "genie.tasks.clusterChecker.errorCounts.gauge";
private static final String LOST_JOBS_RATE_METRIC_NAME = "genie.tasks.clusterChecker.lostJobs.rate";
private static final String FAILED_TO_UPDATE_RATE_METRIC_NAME = "genie.tasks.clusterChecker.unableToUpdateJob.rate";
private static final String REMOTE_NODE_HEALTH_METRIC_NAME = "genie.tasks.clusterChecker.health.counter";
private static final String BAD_HOSTS_GAUGE_METRIC_NAME = "genie.tasks.clusterChecker.unhealtyHosts.gauge";
private static final String BAD_HOSTS_COUNT_METRIC_NAME = "genie.tasks.clusterChecker.unreachableHost.counter";
private static final String BAD_HEALTH_COUNT_METRIC_NAME = "genie.tasks.clusterChecker.failedHealthcheck.counter";
private static final String FAILED_JOBS_COUNT_METRIC_NAME = "genie.tasks.clusterChecker.jobsMarkedFailed.counter";

private final String hostname;
private final ClusterCheckerProperties properties;
Expand All @@ -75,9 +77,6 @@ public class ClusterCheckerTask extends LeadershipTask {

private final Map<String, Integer> errorCounts = new HashMap<>();

private final Counter lostJobsCounter;
private final Counter unableToUpdateJobCounter;

/**
* Constructor.
*
Expand Down Expand Up @@ -109,9 +108,7 @@ public ClusterCheckerTask(
this.healthIndicatorsToIgnore = Splitter.on(",").omitEmptyStrings()
.trimResults().splitToList(properties.getHealthIndicatorsToIgnore());
// Keep track of the number of nodes currently unreachable from the the master
registry.gauge(ERROR_COUNTS_GAUGE_METRIC_NAME, this.errorCounts, Map::size);
this.lostJobsCounter = registry.counter(LOST_JOBS_RATE_METRIC_NAME);
this.unableToUpdateJobCounter = registry.counter(FAILED_TO_UPDATE_RATE_METRIC_NAME);
registry.gauge(BAD_HOSTS_GAUGE_METRIC_NAME, this.errorCounts, Map::size);
}

/**
Expand All @@ -134,7 +131,6 @@ public void run() {
this.updateJobsToFailedOnHost(host);
} catch (Exception e) {
log.error("Unable to update jobs on host {} due to exception", host, e);
this.unableToUpdateJobCounter.increment();
result = false;
}
} else {
Expand All @@ -150,6 +146,8 @@ private void updateJobsToFailedOnHost(final String host) {
final Set<Job> jobs = this.jobSearchService.getAllActiveJobsOnHost(host);
jobs.forEach(
job -> {
final Set<Tag> tags = MetricsUtils.newSuccessTagsSet();
tags.add(Tag.of(MetricsConstants.TagKeys.HOST, host));
try {
this.jobPersistenceService.setJobCompletionInformation(
job.getId().orElseThrow(IllegalArgumentException::new),
Expand All @@ -159,10 +157,14 @@ private void updateJobsToFailedOnHost(final String host) {
null,
null
);
this.lostJobsCounter.increment();
} catch (final GenieException ge) {
MetricsUtils.addFailureTagsWithException(tags, ge);
log.error("Unable to update job {} to failed due to exception", job.getId(), ge);
this.unableToUpdateJobCounter.increment();
throw new RuntimeException("Failed to update job", ge);
} finally {
// Increment whenever there is an attempt to mark a job failed
// (and tag whether it was successful or not).
registry.counter(FAILED_JOBS_COUNT_METRIC_NAME, tags).increment();
}
}
);
Expand All @@ -177,9 +179,10 @@ private void validateHostAndUpdateErrorCount(final String host) {
log.info("Host {} is no longer unhealthy", host);
this.errorCounts.remove(host);
} else {
this.registry.counter(BAD_HOSTS_COUNT_METRIC_NAME, MetricsConstants.TagKeys.HOST, host).increment();
if (this.errorCounts.containsKey(host)) {
final int currentCount = this.errorCounts.get(host) + 1;
log.info("Host still unhealthy (check #{}): {}", host, currentCount);
log.info("Host still unhealthy (check #{}): {}", currentCount, host);
this.errorCounts.put(host, currentCount);
} else {
log.info("Marking host unhealthy: {}", host);
Expand All @@ -189,67 +192,62 @@ private void validateHostAndUpdateErrorCount(final String host) {
}

private boolean isNodeHealthy(final String host) {
//
// A node is valid and healthy if all health indicators excluding the ones mentioned in healthIndicatorsToIgnore
// are UP.
//
boolean result = true;
String responseContent;
try {
this.restTemplate.getForObject(this.scheme + host + this.healthEndpoint, String.class);
responseContent = this.restTemplate.getForObject(
this.scheme + host + this.healthEndpoint,
String.class
);
log.debug("Healtcheck retrieved successfully from: {}", host);
} catch (final HttpStatusCodeException e) {
log.error("Failed validating host {}", host, e);
try {
final Map<String, Object> responseMap = GenieObjectMapper.getMapper()
.readValue(
e.getResponseBodyAsByteArray(),
TypeFactory.defaultInstance().constructMapType(Map.class, String.class, Object.class)
);
for (Map.Entry<String, Object> responseEntry : responseMap.entrySet()) {
if (responseEntry.getValue() instanceof Map) {
final Map indicatorMap = (Map) responseEntry.getValue();

final String indicatorName = responseEntry.getKey();
final Object indicatorStatusOrNull = indicatorMap.get(PROPERTY_STATUS);

final Status indicatorStatus;
// Healthcheck returning status != 2xx should still contain a payload with health details.
log.warn("Host {} healthcheck returned code: {}", host, e.getStatusCode(), e);
responseContent = e.getResponseBodyAsString();
} catch (final RestClientException e) {
// Other failure to execute the request
log.warn("Unable to request healtcheck response from host: {}", host, e);
return false;
}

if (indicatorStatusOrNull instanceof Status) {
indicatorStatus = (Status) indicatorStatusOrNull;
} else if (indicatorStatusOrNull instanceof String) {
indicatorStatus = new Status((String) indicatorStatusOrNull);
} else {
indicatorStatus = Status.UNKNOWN;
}
// Parse response body
final HealthEndpointResponse healthEndpointResponse;
try {
healthEndpointResponse = GenieObjectMapper.getMapper()
.readValue(
responseContent,
HealthEndpointResponse.class
);
} catch (IOException ex) {
log.warn("Failed to parse healthcheck response from host: {}: {}", host, ex.getMessage());
return false;
}

//Increment counter tagged with target hostname and name of health indicator
final Set<Tag> tags = MetricsUtils.newSuccessTagsSet();
tags.add(Tag.of(MetricsConstants.TagKeys.HOST, host));
tags.add(Tag.of(MetricsConstants.TagKeys.HEALTH_INDICATOR, indicatorName));
tags.add(Tag.of(MetricsConstants.TagKeys.HEALTH_STATUS, indicatorStatus.getCode()));
this.registry.counter(REMOTE_NODE_HEALTH_METRIC_NAME, tags).increment();
// Ignore the top-level health (it's not UP if this code is executing) and instead look at individual
// indicators, as some of them may be ignored.
boolean hostHealthy = true;
for (final Map.Entry<String, HealthIndicatorDetails> entry : healthEndpointResponse.getDetails().entrySet()) {
final String healthIndicatorName = entry.getKey();
final HealthIndicatorDetails healthIndicator = entry.getValue();

if (this.healthIndicatorsToIgnore.contains(indicatorName)) {
log.debug("Ignoring indicator: {}", indicatorName);
} else if (Status.UP.equals(indicatorStatus)) {
log.debug("Indicator {} is UP", indicatorName);
} else {
log.warn("Indicator {} is {} for host {}", indicatorName, indicatorStatus, host);
// Mark host as failed but keep iterating to publish metrics.
result = false;
}
}
}
} catch (RuntimeException ex) {
throw ex;
} catch (Exception ex) {
log.error("Failed reading the error response when validating host {}", host, ex);
result = false;
if (this.healthIndicatorsToIgnore.contains(healthIndicatorName)) {
log.debug("Ignoring indicator: {}", healthIndicatorName);
} else if (Status.UP.getCode().equals(healthIndicator.getStatus().getCode())) {
log.debug("Indicator {} is UP", healthIndicatorName);
} else {
// Consider this host unhealthy, but keep going to publish metrics for all indicators
hostHealthy = false;
//Increment counter tagged with target hostname and name of health indicator
this.registry.counter(
BAD_HEALTH_COUNT_METRIC_NAME,
MetricsConstants.TagKeys.HOST, host,
MetricsConstants.TagKeys.HEALTH_INDICATOR, healthIndicatorName,
MetricsConstants.TagKeys.HEALTH_STATUS, healthIndicator.getStatus().getCode()
).increment();
}
} catch (final Exception e) {
log.error("Unable to reach {}", host, e);
result = false;
}
return result;
return hostHealthy;
}

/**
Expand Down Expand Up @@ -284,4 +282,20 @@ public void cleanup() {
int getErrorCountsSize() {
return this.errorCounts.size();
}

@Getter
@Setter
@NoArgsConstructor
private static class HealthIndicatorDetails {
private Status status;
private Map<String, Object> details;
}

@Getter
@Setter
@NoArgsConstructor
private static class HealthEndpointResponse {
private Status status;
private Map<String, HealthIndicatorDetails> details;
}
}
Loading

0 comments on commit 4374fa1

Please sign in to comment.