Skip to content

Commit

Permalink
[FLINK-10252][metrics] Handle oversized metric messages
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua authored and zentol committed Dec 11, 2018
1 parent 7e6feea commit 1cf6d30
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,38 @@ public static class MetricSerializationResult implements Serializable {

private static final long serialVersionUID = 6928770855951536906L;

public final byte[] serializedMetrics;
public final byte[] serializedCounters;
public final byte[] serializedGauges;
public final byte[] serializedMeters;
public final byte[] serializedHistograms;

public final int numCounters;
public final int numGauges;
public final int numMeters;
public final int numHistograms;

public MetricSerializationResult(byte[] serializedMetrics, int numCounters, int numGauges, int numMeters, int numHistograms) {
Preconditions.checkNotNull(serializedMetrics);
public MetricSerializationResult(
byte[] serializedCounters,
byte[] serializedGauges,
byte[] serializedMeters,
byte[] serializedHistograms,
int numCounters,
int numGauges,
int numMeters,
int numHistograms) {

Preconditions.checkNotNull(serializedCounters);
Preconditions.checkNotNull(serializedGauges);
Preconditions.checkNotNull(serializedMeters);
Preconditions.checkNotNull(serializedHistograms);
Preconditions.checkArgument(numCounters >= 0);
Preconditions.checkArgument(numGauges >= 0);
Preconditions.checkArgument(numMeters >= 0);
Preconditions.checkArgument(numHistograms >= 0);
this.serializedMetrics = serializedMetrics;
this.serializedCounters = serializedCounters;
this.serializedGauges = serializedGauges;
this.serializedMeters = serializedMeters;
this.serializedHistograms = serializedHistograms;
this.numCounters = numCounters;
this.numGauges = numGauges;
this.numMeters = numMeters;
Expand All @@ -102,7 +121,10 @@ public MetricSerializationResult(byte[] serializedMetrics, int numCounters, int
*/
public static class MetricDumpSerializer {

private DataOutputSerializer buffer = new DataOutputSerializer(1024 * 32);
private DataOutputSerializer countersBuffer = new DataOutputSerializer(1024 * 8);
private DataOutputSerializer gaugesBuffer = new DataOutputSerializer(1024 * 8);
private DataOutputSerializer metersBuffer = new DataOutputSerializer(1024 * 8);
private DataOutputSerializer histogramsBuffer = new DataOutputSerializer(1024 * 8);

/**
* Serializes the given metrics and returns the resulting byte array.
Expand All @@ -126,53 +148,66 @@ public MetricSerializationResult serialize(
Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms,
Map<Meter, Tuple2<QueryScopeInfo, String>> meters) {

buffer.clear();

countersBuffer.clear();
int numCounters = 0;
for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) {
try {
serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
serializeCounter(countersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
numCounters++;
} catch (Exception e) {
LOG.debug("Failed to serialize counter.", e);
}
}

gaugesBuffer.clear();
int numGauges = 0;
for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) {
try {
serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
serializeGauge(gaugesBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
numGauges++;
} catch (Exception e) {
LOG.debug("Failed to serialize gauge.", e);
}
}

histogramsBuffer.clear();
int numHistograms = 0;
for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) {
try {
serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
serializeHistogram(histogramsBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
numHistograms++;
} catch (Exception e) {
LOG.debug("Failed to serialize histogram.", e);
}
}

metersBuffer.clear();
int numMeters = 0;
for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet()) {
try {
serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
serializeMeter(metersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
numMeters++;
} catch (Exception e) {
LOG.debug("Failed to serialize meter.", e);
}
}

return new MetricSerializationResult(buffer.getCopyOfBuffer(), numCounters, numGauges, numMeters, numHistograms);
return new MetricSerializationResult(
countersBuffer.getCopyOfBuffer(),
gaugesBuffer.getCopyOfBuffer(),
metersBuffer.getCopyOfBuffer(),
histogramsBuffer.getCopyOfBuffer(),
numCounters,
numGauges,
numMeters,
numHistograms);
}

public void close() {
buffer = null;
countersBuffer = null;
gaugesBuffer = null;
metersBuffer = null;
histogramsBuffer = null;
}
}

Expand Down Expand Up @@ -280,39 +315,42 @@ public static class MetricDumpDeserializer {
* @return A list containing the deserialized metrics.
*/
public List<MetricDump> deserialize(MetricDumpSerialization.MetricSerializationResult data) {
DataInputView in = new DataInputDeserializer(data.serializedMetrics, 0, data.serializedMetrics.length);
DataInputView countersInputView = new DataInputDeserializer(data.serializedCounters, 0, data.serializedCounters.length);
DataInputView gaugesInputView = new DataInputDeserializer(data.serializedGauges, 0, data.serializedGauges.length);
DataInputView metersInputView = new DataInputDeserializer(data.serializedMeters, 0, data.serializedMeters.length);
DataInputView histogramsInputView = new DataInputDeserializer(data.serializedHistograms, 0, data.serializedHistograms.length);

List<MetricDump> metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters);
List<MetricDump> metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numMeters + data.numHistograms);

for (int x = 0; x < data.numCounters; x++) {
try {
metrics.add(deserializeCounter(in));
metrics.add(deserializeCounter(countersInputView));
} catch (Exception e) {
LOG.debug("Failed to deserialize counter.", e);
}
}

for (int x = 0; x < data.numGauges; x++) {
try {
metrics.add(deserializeGauge(in));
metrics.add(deserializeGauge(gaugesInputView));
} catch (Exception e) {
LOG.debug("Failed to deserialize gauge.", e);
}
}

for (int x = 0; x < data.numHistograms; x++) {
for (int x = 0; x < data.numMeters; x++) {
try {
metrics.add(deserializeHistogram(in));
metrics.add(deserializeMeter(metersInputView));
} catch (Exception e) {
LOG.debug("Failed to deserialize histogram.", e);
LOG.debug("Failed to deserialize meter.", e);
}
}

for (int x = 0; x < data.numMeters; x++) {
for (int x = 0; x < data.numHistograms; x++) {
try {
metrics.add(deserializeMeter(in));
metrics.add(deserializeHistogram(histogramsInputView));
} catch (Exception e) {
LOG.debug("Failed to deserialize meter.", e);
LOG.debug("Failed to deserialize histogram.", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class MetricQueryService extends UntypedActor {
private static final Logger LOG = LoggerFactory.getLogger(MetricQueryService.class);

public static final String METRIC_QUERY_SERVICE_NAME = "MetricQueryService";
private static final String SIZE_EXCEEDED_LOG_TEMPLATE = "{} will not be reported as the metric dump would exceed the maximum size of {} bytes.";

private static final CharacterFilter FILTER = new CharacterFilter() {
@Override
Expand Down Expand Up @@ -115,6 +116,9 @@ public void onReceive(Object message) {
}
} else if (message instanceof CreateDump) {
MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters);

dump = enforceSizeLimit(dump);

getSender().tell(dump, getSelf());
} else {
LOG.warn("MetricQueryServiceActor received an invalid message. " + message.toString());
Expand All @@ -125,6 +129,83 @@ public void onReceive(Object message) {
}
}

private MetricDumpSerialization.MetricSerializationResult enforceSizeLimit(
MetricDumpSerialization.MetricSerializationResult serializationResult) {

int currentLength = 0;
boolean hasExceededBefore = false;

byte[] serializedCounters = serializationResult.serializedCounters;
int numCounters = serializationResult.numCounters;
if (exceedsMessageSizeLimit(currentLength + serializationResult.serializedCounters.length)) {
logDumpSizeWouldExceedLimit("Counters", hasExceededBefore);
hasExceededBefore = true;

serializedCounters = new byte[0];
numCounters = 0;
} else {
currentLength += serializedCounters.length;
}

byte[] serializedMeters = serializationResult.serializedMeters;
int numMeters = serializationResult.numMeters;
if (exceedsMessageSizeLimit(currentLength + serializationResult.serializedMeters.length)) {
logDumpSizeWouldExceedLimit("Meters", hasExceededBefore);
hasExceededBefore = true;

serializedMeters = new byte[0];
numMeters = 0;
} else {
currentLength += serializedMeters.length;
}

byte[] serializedGauges = serializationResult.serializedGauges;
int numGauges = serializationResult.numGauges;
if (exceedsMessageSizeLimit(currentLength + serializationResult.serializedGauges.length)) {
logDumpSizeWouldExceedLimit("Gauges", hasExceededBefore);
hasExceededBefore = true;

serializedGauges = new byte[0];
numGauges = 0;
} else {
currentLength += serializedGauges.length;
}

byte[] serializedHistograms = serializationResult.serializedHistograms;
int numHistograms = serializationResult.numHistograms;
if (exceedsMessageSizeLimit(currentLength + serializationResult.serializedHistograms.length)) {
logDumpSizeWouldExceedLimit("Histograms", hasExceededBefore);
hasExceededBefore = true;

serializedHistograms = new byte[0];
numHistograms = 0;
}

return new MetricDumpSerialization.MetricSerializationResult(
serializedCounters,
serializedGauges,
serializedMeters,
serializedHistograms,
numCounters,
numGauges,
numMeters,
numHistograms);
}

private boolean exceedsMessageSizeLimit(final int currentSize) {
return currentSize > messageSizeLimit;
}

private void logDumpSizeWouldExceedLimit(final String metricType, boolean hasExceededBefore) {
if (LOG.isDebugEnabled()) {
LOG.debug(SIZE_EXCEEDED_LOG_TEMPLATE, metricType, messageSizeLimit);
} else {
if (!hasExceededBefore) {
LOG.info(SIZE_EXCEEDED_LOG_TEMPLATE, "Some metrics", messageSizeLimit);
}
}
}

/**
* Lightweight method to replace unsupported characters.
* If the string does not contain any unsupported characters, this method creates no
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ public Object getValue() {
Collections.<Meter, Tuple2<QueryScopeInfo, String>>emptyMap());

// no metrics should be serialized
Assert.assertEquals(0, output.serializedMetrics.length);
Assert.assertEquals(0, output.serializedCounters.length);
Assert.assertEquals(0, output.serializedGauges.length);
Assert.assertEquals(0, output.serializedHistograms.length);
Assert.assertEquals(0, output.serializedMeters.length);

List<MetricDump> deserialized = deserializer.deserialize(output);
Assert.assertEquals(0, deserialized.size());
Expand Down Expand Up @@ -141,7 +144,8 @@ public long getCount() {
gauges.put(g1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, "D"), "g1"));
histograms.put(h1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.OperatorQueryScopeInfo("jid", "vid", 2, "opname", "E"), "h1"));

MetricDumpSerialization.MetricSerializationResult serialized = serializer.serialize(counters, gauges, histograms, meters);
MetricDumpSerialization.MetricSerializationResult serialized = serializer.serialize(
counters, gauges, histograms, meters);
List<MetricDump> deserialized = deserializer.deserialize(serialized);

// ===== Counters ==============================================================================================
Expand Down
Loading

0 comments on commit 1cf6d30

Please sign in to comment.