From cbb07e9b7e926390be56203d4cac4ff826533c44 Mon Sep 17 00:00:00 2001 From: Jacob Ke Date: Mon, 1 Mar 2021 13:41:32 +0800 Subject: [PATCH] fix(consumer): validate timestamp not working Signed-off-by: Jacob Ke --- .../processor/TraceDecodeProcessor.java | 63 +++++++------------ .../etrace/consumer/metrics/MetricName.java | 4 +- .../consumer/metrics/MetricsService.java | 10 +-- 3 files changed, 32 insertions(+), 45 deletions(-) diff --git a/etrace-consumer/src/main/java/io/etrace/consumer/component/processor/TraceDecodeProcessor.java b/etrace-consumer/src/main/java/io/etrace/consumer/component/processor/TraceDecodeProcessor.java index a4bc045..08ffdc2 100644 --- a/etrace-consumer/src/main/java/io/etrace/consumer/component/processor/TraceDecodeProcessor.java +++ b/etrace-consumer/src/main/java/io/etrace/consumer/component/processor/TraceDecodeProcessor.java @@ -1,6 +1,5 @@ package io.etrace.consumer.component.processor; -import io.etrace.agent.Trace; import io.etrace.agent.config.AgentConfiguration; import io.etrace.common.message.trace.CallStackV1; import io.etrace.common.message.trace.MessageItem; @@ -8,6 +7,9 @@ import io.etrace.common.pipeline.Component; import io.etrace.common.pipeline.Processor; import io.etrace.common.pipeline.impl.DefaultAsyncTask; +import io.etrace.common.util.RequestIdHelper; +import io.etrace.common.util.TimeHelper; +import io.etrace.consumer.config.ConsumerProperties; import io.etrace.consumer.metrics.MetricsService; import io.etrace.consumer.model.MessageBlock; import io.etrace.consumer.util.CallStackUtil; @@ -15,7 +17,6 @@ import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tags; import io.micrometer.core.instrument.Timer; -import joptsimple.internal.Strings; import kafka.message.MessageAndMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,7 +30,6 @@ import java.util.List; import java.util.ListIterator; import java.util.Map; -import java.util.Optional; import java.util.stream.Collectors; import static com.google.common.collect.Lists.newArrayList; @@ -39,13 +39,14 @@ @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class TraceDecodeProcessor extends DefaultAsyncTask implements Processor { - protected static final String INVALID_MESSAGE = "invalid.message"; - public final Logger LOGGER = LoggerFactory.getLogger(TraceDecodeProcessor.class); @Autowired public MetricsService metricsService; + @Autowired + protected ConsumerProperties consumerProperties; + public Timer hdfsTimer; public Counter parseError; @@ -53,24 +54,6 @@ public TraceDecodeProcessor(String name, Component component, Map items, byte[] body) { MessageItem item = it.next(); CallStackV1 callStack = item.getCallStack(); - if (CallStackUtil.validate(callStack)) { + // validate message. subclass can override validateMessage() + if (validateMessage(item)) { CallStackUtil.removeClientAppId(callStack); } else { it.remove(); - metricsService.invalidCallStack(CHECK_EXCEPTION, callStack.getAppId()); + LOGGER.info("Discard MessageItem: AppId[{}], RequestId[{}], Host[{}], HostName[{}].", + callStack.getAppId(), callStack.getRequestId(), callStack.getHostIp(), callStack.getHostName()); } } if (!items.isEmpty()) { @@ -152,10 +137,7 @@ public List decode(byte[] compressData) throws IOException { //set CallStack offset in message block item.setOffset(offset); - // validate message. subclass can override validMessage() - if (validMessage(item)) { - messageItems.add(item); - } + messageItems.add(item); offset += 4 + dataLen; } @@ -163,16 +145,17 @@ public List decode(byte[] compressData) throws IOException { } } - public boolean validMessage(MessageItem item) { - boolean valid = validateCallStack(item.getCallStack()); - if (!valid) { - Trace.newCounter(INVALID_MESSAGE) - .addTag("name", component.getName()) - .addTag("type", "invalidCallstack") - .addTag("appId", item.getCallStack() == null ? "nullCallstack" : - Optional.ofNullable(item.getCallStack().getAppId()).orElse("nullAppId")) - .once(); + protected boolean validateMessage(MessageItem item) { + long timestampInRequestId = RequestIdHelper.getTimestamp(item.getRequestId()); + if (item.getCallStack() == null) { + metricsService.invalidCallStack(CHECK_DATA_INTEGRATION, "nullCallstack"); + } else if (!CallStackUtil.validate(item.getCallStack())) { + metricsService.invalidCallStack(CHECK_DATA_INTEGRATION, item.getCallStack().getAppId()); + } else if (!TimeHelper.isInPeriod(timestampInRequestId, 24 * consumerProperties.getKeeper(), 24)) { + metricsService.invalidCallStack(CHECK_TIMESTAMP, item.getCallStack().getAppId()); + } else { + return true; } - return valid; + return false; } } diff --git a/etrace-consumer/src/main/java/io/etrace/consumer/metrics/MetricName.java b/etrace-consumer/src/main/java/io/etrace/consumer/metrics/MetricName.java index 66b706a..94dd908 100644 --- a/etrace-consumer/src/main/java/io/etrace/consumer/metrics/MetricName.java +++ b/etrace-consumer/src/main/java/io/etrace/consumer/metrics/MetricName.java @@ -22,6 +22,9 @@ public interface MetricName { */ String CALLSTACK_PARSE_ERROR = "callstack.parse.error"; String CALLSTACK_CHECK_INVALID = "callstack.check.invalid"; + String CHECK_DATA_INTEGRATION = "check_data_integration"; + String CHECK_TIMESTAMP = "check_timestamp"; + String HDFS_ERROR = "hdfs.error"; String HDFS_THROUGHPUT = "hdfs.throughput"; String METRIC_NO_SAMPLING = "metric.no.sampling"; @@ -35,5 +38,4 @@ public interface MetricName { String HBASE_PUT = "hbase.put"; String HBASE_FAIL = "hbase.fail"; - String CHECK_EXCEPTION = "check"; } diff --git a/etrace-consumer/src/main/java/io/etrace/consumer/metrics/MetricsService.java b/etrace-consumer/src/main/java/io/etrace/consumer/metrics/MetricsService.java index 2e808e7..eb8625f 100644 --- a/etrace-consumer/src/main/java/io/etrace/consumer/metrics/MetricsService.java +++ b/etrace-consumer/src/main/java/io/etrace/consumer/metrics/MetricsService.java @@ -24,6 +24,7 @@ import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import static io.etrace.consumer.metrics.MetricName.*; @@ -31,10 +32,10 @@ @Service public class MetricsService { - private Cache invalidCache; - private Map hBaseWriteTimer; - private Map hBaseWriteCounter; - private Counter hbaseWriteErrorCounter; + private final Cache invalidCache; + private final Map hBaseWriteTimer; + private final Map hBaseWriteCounter; + private final Counter hbaseWriteErrorCounter; public MetricsService() { invalidCache = CacheBuilder.newBuilder().maximumSize(512).expireAfterAccess(5, TimeUnit.MINUTES).build(); @@ -45,6 +46,7 @@ public MetricsService() { } public void invalidCallStack(String type, String appId) { + appId = Optional.ofNullable(appId).orElse("nullAppId"); Counter counter = invalidCache.getIfPresent(type.concat("#").concat(appId)); if (null == counter) { counter = Metrics.counter(CALLSTACK_CHECK_INVALID, Tags.of(Tag.of("type", type), Tag.of("agent", appId)));