Skip to content

Commit

Permalink
fix(consumer): validate timestamp not working
Browse files Browse the repository at this point in the history
Signed-off-by: Jacob Ke <[email protected]>
  • Loading branch information
jacobke committed Mar 1, 2021
1 parent af7ddd9 commit cbb07e9
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
package io.etrace.consumer.component.processor;

import io.etrace.agent.Trace;
import io.etrace.agent.config.AgentConfiguration;
import io.etrace.common.message.trace.CallStackV1;
import io.etrace.common.message.trace.MessageItem;
import io.etrace.common.message.trace.codec.JSONCodecV1;
import io.etrace.common.pipeline.Component;
import io.etrace.common.pipeline.Processor;
import io.etrace.common.pipeline.impl.DefaultAsyncTask;
import io.etrace.common.util.RequestIdHelper;
import io.etrace.common.util.TimeHelper;
import io.etrace.consumer.config.ConsumerProperties;
import io.etrace.consumer.metrics.MetricsService;
import io.etrace.consumer.model.MessageBlock;
import io.etrace.consumer.util.CallStackUtil;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import joptsimple.internal.Strings;
import kafka.message.MessageAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -29,7 +30,6 @@
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static com.google.common.collect.Lists.newArrayList;
Expand All @@ -39,38 +39,21 @@
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class TraceDecodeProcessor extends DefaultAsyncTask implements Processor {

protected static final String INVALID_MESSAGE = "invalid.message";

public final Logger LOGGER = LoggerFactory.getLogger(TraceDecodeProcessor.class);

@Autowired
public MetricsService metricsService;

@Autowired
protected ConsumerProperties consumerProperties;

public Timer hdfsTimer;
public Counter parseError;

public TraceDecodeProcessor(String name, Component component, Map<String, Object> params) {
super(name, component, params);
}

private static boolean validateCallStack(CallStackV1 callStack) {
if (callStack == null) {
return false;
} else if (Strings.isNullOrEmpty(callStack.getAppId())) {
return false;
} else if (Strings.isNullOrEmpty(callStack.getHostIp())) {
return false;
} else if (Strings.isNullOrEmpty(callStack.getHostName())) {
return false;
} else if (Strings.isNullOrEmpty(callStack.getRequestId())) {
return false;
} else if (Strings.isNullOrEmpty(callStack.getId())) {
return false;
} else {
return callStack.getMessage() != null;
}
}

@Override
public void startup() {
super.startup();
Expand All @@ -80,7 +63,7 @@ public void startup() {

@Override
public void processEvent(Object key, Object event) {
if (null == event || !(event instanceof MessageAndMetadata)) {
if (!(event instanceof MessageAndMetadata)) {
return;
}

Expand Down Expand Up @@ -120,11 +103,13 @@ public void writeToHDFS(List<MessageItem> items, byte[] body) {
MessageItem item = it.next();
CallStackV1 callStack = item.getCallStack();

if (CallStackUtil.validate(callStack)) {
// validate message. subclass can override validateMessage()
if (validateMessage(item)) {
CallStackUtil.removeClientAppId(callStack);
} else {
it.remove();
metricsService.invalidCallStack(CHECK_EXCEPTION, callStack.getAppId());
LOGGER.info("Discard MessageItem: AppId[{}], RequestId[{}], Host[{}], HostName[{}].",
callStack.getAppId(), callStack.getRequestId(), callStack.getHostIp(), callStack.getHostName());
}
}
if (!items.isEmpty()) {
Expand Down Expand Up @@ -152,27 +137,25 @@ public List<MessageItem> decode(byte[] compressData) throws IOException {
//set CallStack offset in message block
item.setOffset(offset);

// validate message. subclass can override validMessage()
if (validMessage(item)) {
messageItems.add(item);
}
messageItems.add(item);

offset += 4 + dataLen;
}
return messageItems;
}
}

public boolean validMessage(MessageItem item) {
boolean valid = validateCallStack(item.getCallStack());
if (!valid) {
Trace.newCounter(INVALID_MESSAGE)
.addTag("name", component.getName())
.addTag("type", "invalidCallstack")
.addTag("appId", item.getCallStack() == null ? "nullCallstack" :
Optional.ofNullable(item.getCallStack().getAppId()).orElse("nullAppId"))
.once();
protected boolean validateMessage(MessageItem item) {
long timestampInRequestId = RequestIdHelper.getTimestamp(item.getRequestId());
if (item.getCallStack() == null) {
metricsService.invalidCallStack(CHECK_DATA_INTEGRATION, "nullCallstack");
} else if (!CallStackUtil.validate(item.getCallStack())) {
metricsService.invalidCallStack(CHECK_DATA_INTEGRATION, item.getCallStack().getAppId());
} else if (!TimeHelper.isInPeriod(timestampInRequestId, 24 * consumerProperties.getKeeper(), 24)) {
metricsService.invalidCallStack(CHECK_TIMESTAMP, item.getCallStack().getAppId());
} else {
return true;
}
return valid;
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ public interface MetricName {
*/
String CALLSTACK_PARSE_ERROR = "callstack.parse.error";
String CALLSTACK_CHECK_INVALID = "callstack.check.invalid";
String CHECK_DATA_INTEGRATION = "check_data_integration";
String CHECK_TIMESTAMP = "check_timestamp";

String HDFS_ERROR = "hdfs.error";
String HDFS_THROUGHPUT = "hdfs.throughput";
String METRIC_NO_SAMPLING = "metric.no.sampling";
Expand All @@ -35,5 +38,4 @@ public interface MetricName {
String HBASE_PUT = "hbase.put";
String HBASE_FAIL = "hbase.fail";

String CHECK_EXCEPTION = "check";
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static io.etrace.consumer.metrics.MetricName.*;

@Service
public class MetricsService {

private Cache<String, Counter> invalidCache;
private Map<String, Timer> hBaseWriteTimer;
private Map<String, Counter> hBaseWriteCounter;
private Counter hbaseWriteErrorCounter;
private final Cache<String, Counter> invalidCache;
private final Map<String, Timer> hBaseWriteTimer;
private final Map<String, Counter> hBaseWriteCounter;
private final Counter hbaseWriteErrorCounter;

public MetricsService() {
invalidCache = CacheBuilder.newBuilder().maximumSize(512).expireAfterAccess(5, TimeUnit.MINUTES).build();
Expand All @@ -45,6 +46,7 @@ public MetricsService() {
}

public void invalidCallStack(String type, String appId) {
appId = Optional.ofNullable(appId).orElse("nullAppId");
Counter counter = invalidCache.getIfPresent(type.concat("#").concat(appId));
if (null == counter) {
counter = Metrics.counter(CALLSTACK_CHECK_INVALID, Tags.of(Tag.of("type", type), Tag.of("agent", appId)));
Expand Down

0 comments on commit cbb07e9

Please sign in to comment.