Skip to content

Commit

Permalink
Change datafeed req logging, change receiptId format, fix gh-4695
Browse files Browse the repository at this point in the history
  • Loading branch information
at055612 committed Jan 14, 2025
1 parent c29024b commit aef8eff
Show file tree
Hide file tree
Showing 45 changed files with 667 additions and 252 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package stroom.core.receive;

import stroom.receive.common.FeedStatusService;
import stroom.receive.common.ReceiptIdGenerator;
import stroom.receive.common.RequestHandler;

import com.google.inject.AbstractModule;
Expand All @@ -27,5 +28,6 @@ public class ReceiveDataModule extends AbstractModule {
protected void configure() {
bind(RequestHandler.class).to(ReceiveDataRequestHandler.class);
bind(FeedStatusService.class).to(FeedStatusServiceImpl.class);
bind(ReceiptIdGenerator.class).to(StroomReceiptIdGenerator.class).asEagerSingleton();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import stroom.proxy.StroomStatusCode;
import stroom.receive.common.AttributeMapFilter;
import stroom.receive.common.AttributeMapValidator;
import stroom.receive.common.ReceiptIdGenerator;
import stroom.receive.common.RequestAuthenticator;
import stroom.receive.common.RequestHandler;
import stroom.receive.common.StreamTargetStreamHandlers;
Expand All @@ -44,6 +45,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.time.Instant;
import java.util.List;
import java.util.function.Consumer;
Expand All @@ -64,6 +66,7 @@ class ReceiveDataRequestHandler implements RequestHandler {
private final MetaService metaService;
private final RequestAuthenticator requestAuthenticator;
private final CertificateExtractor certificateExtractor;
private final ReceiptIdGenerator receiptIdGenerator;

@Inject
public ReceiveDataRequestHandler(final SecurityContext securityContext,
Expand All @@ -72,14 +75,16 @@ public ReceiveDataRequestHandler(final SecurityContext securityContext,
final TaskContextFactory taskContextFactory,
final MetaService metaService,
final RequestAuthenticator requestAuthenticator,
final CertificateExtractor certificateExtractor) {
final CertificateExtractor certificateExtractor,
final ReceiptIdGenerator receiptIdGenerator) {
this.securityContext = securityContext;
this.attributeMapFilterFactory = attributeMapFilterFactory;
this.streamTargetStreamHandlerProvider = streamTargetStreamHandlerProvider;
this.taskContextFactory = taskContextFactory;
this.metaService = metaService;
this.requestAuthenticator = requestAuthenticator;
this.certificateExtractor = certificateExtractor;
this.receiptIdGenerator = receiptIdGenerator;
}

@Override
Expand All @@ -96,6 +101,12 @@ public void handle(final HttpServletRequest request, final HttpServletResponse r
// Validate the supplied attributes.
AttributeMapValidator.validate(attributeMap, metaService::getTypes);

// Create a new receiptId for the request, so we can track progress and report back the
// receiptId to the sender
final String receiptId = receiptIdGenerator.generateId().toString();
attributeMap.put(StandardHeaderArguments.RECEIPT_ID, receiptId);
attributeMap.appendItem(StandardHeaderArguments.RECEIPT_ID_PATH, receiptId);

final String feedName;
if (attributeMapFilter.filter(attributeMap)) {
debug("Receiving data", attributeMap);
Expand Down Expand Up @@ -130,6 +141,14 @@ public void handle(final HttpServletRequest request, final HttpServletResponse r
// Set the response status.
final StroomStatusCode stroomStatusCode = StroomStatusCode.OK;
response.setStatus(stroomStatusCode.getHttpCode());

LOGGER.debug(() -> "Writing receipt id attribute to response: " + receiptId);
try (final PrintWriter writer = response.getWriter()) {
writer.println(receiptId);
} catch (final IOException e) {
LOGGER.error(e.getMessage(), e);
}

logSuccess(new StroomStreamStatus(stroomStatusCode, attributeMap));
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package stroom.core.receive;

import stroom.node.api.NodeInfo;
import stroom.receive.common.ReceiptIdGenerator;
import stroom.util.NullSafe;
import stroom.util.concurrent.UniqueId;
import stroom.util.concurrent.UniqueId.NodeType;
import stroom.util.concurrent.UniqueIdGenerator;
import stroom.util.logging.LambdaLogger;
import stroom.util.logging.LambdaLoggerFactory;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import java.util.Objects;

@Singleton
public class StroomReceiptIdGenerator implements ReceiptIdGenerator {

private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(StroomReceiptIdGenerator.class);
private static final String UNSAFE_CHARS_REGEX = "[^A-Za-z0-9-]";

private static final NodeType NODE_TYPE = NodeType.STROOM;

private final UniqueIdGenerator uniqueIdGenerator;

private final String nodeId;

@Inject
public StroomReceiptIdGenerator(final NodeInfo nodeInfo) {
final String nodeName = nodeInfo.getThisNodeName();
if (NullSafe.isBlankString(nodeName)) {
throw new IllegalArgumentException("nodeName is blank");
}
this.nodeId = createSafeString(nodeName);
if (!Objects.equals(nodeName, nodeId)) {
LOGGER.info("Using nodeId '{}' for receiptId meta attribute as derived from nodeName '{}'",
nodeId, nodeName);
}
this.uniqueIdGenerator = new UniqueIdGenerator(NODE_TYPE, nodeId);
}

@Override
public UniqueId generateId() {
return uniqueIdGenerator.generateId();
}

static String createSafeString(final String in) {
if (in == null) {
return null;
} else {
return in.replaceAll(UNSAFE_CHARS_REGEX, "-");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import stroom.meta.api.AttributeMap;
import stroom.meta.api.AttributeMapUtil;
import stroom.meta.api.StandardHeaderArguments;
import stroom.receive.common.ReceiptIdGenerator;
import stroom.receive.common.StreamTargetStreamHandlers;
import stroom.receive.common.StroomStreamProcessor;
import stroom.security.api.SecurityContext;
Expand Down Expand Up @@ -52,14 +53,17 @@ public class DataUploadTaskHandler {
private final TaskContextFactory taskContextFactory;
private final SecurityContext securityContext;
private final StreamTargetStreamHandlers streamHandlers;
private final ReceiptIdGenerator receiptIdGenerator;

@Inject
DataUploadTaskHandler(final TaskContextFactory taskContextFactory,
final SecurityContext securityContext,
final StreamTargetStreamHandlers streamHandlers) {
final StreamTargetStreamHandlers streamHandlers,
final ReceiptIdGenerator receiptIdGenerator) {
this.taskContextFactory = taskContextFactory;
this.securityContext = securityContext;
this.streamHandlers = streamHandlers;
this.receiptIdGenerator = receiptIdGenerator;
}

public void uploadData(final String fileName,
Expand Down Expand Up @@ -114,6 +118,11 @@ private void uploadData(final TaskContext taskContext,
attributeMap.putDateTime(StandardHeaderArguments.RECEIVED_TIME_HISTORY, receivedTime);
attributeMap.put(StandardHeaderArguments.USER_AGENT, "STROOM-UI");
attributeMap.put("UploadedBy", securityContext.getUserIdentityForAudit());
// Create a new receiptId for the request, so we can track progress and report back the
// receiptId to the sender
final String receiptId = receiptIdGenerator.generateId().toString();
attributeMap.put(StandardHeaderArguments.RECEIPT_ID, receiptId);
attributeMap.appendItem(StandardHeaderArguments.RECEIPT_ID_PATH, receiptId);

final Consumer<Long> progressHandler = new TaskProgressHandler(taskContext, "Uploading");

Expand All @@ -122,7 +131,7 @@ private void uploadData(final TaskContext taskContext,
uploadZipFile(file, feedName, typeName, attributeMap, progressHandler);

} else if (name.endsWith(FILE_SEPARATOR + StandardHeaderArguments.COMPRESSION_GZIP) ||
name.endsWith(FILE_SEPARATOR + GZ)) {
name.endsWith(FILE_SEPARATOR + GZ)) {
attributeMap.put(StandardHeaderArguments.COMPRESSION, StandardHeaderArguments.COMPRESSION_GZIP);
uploadStreamFile(file, feedName, typeName, attributeMap, progressHandler);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public interface StandardHeaderArguments {

String FEED = "Feed";
String TYPE = "Type";
String SYSTEM = "System";
String ENVIRONMENT = "Environment";

// Typically added in by nginx
String X_FORWARDED_FOR = "X-Forwarded-For";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@ public NodeDbConfig getDbConfig() {

@NotNull
@ReadOnly
@JsonPropertyDescription("The name of the node to identify it in the cluster. " +
@JsonPropertyDescription(
"The name of the node to identify it in the cluster. " +
"Should only be set per node in the application YAML config file. The node name should not " +
"be changed once set.")
"be changed once set. This node name will be used in the 'receiptId' meta attribute so " +
"if a stroom cluster is forwarding to another stroom cluster, then the " +
"node name should be unique across all clusters involved.")
@JsonProperty(PROP_NAME_NAME)
public String getNodeName() {
return nodeName;
Expand All @@ -64,8 +67,8 @@ public StatusConfig getStatusConfig() {
@Override
public String toString() {
return "NodeConfig{" +
"nodeName='" + nodeName + '\'' +
'}';
"nodeName='" + nodeName + '\'' +
'}';
}

@BootStrapConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.regex.Pattern;

public class CSVFormatter {

private static final String COMMA = ",";
private static final String QUOTE = "\"";
private static final String DOUBLE_QUOTE = "\"\"";
Expand All @@ -15,9 +16,11 @@ public class CSVFormatter {
private static final String ESCAPED_EQUALS = "\\=";
private static final Pattern EQUALS_PATTERN = Pattern.compile(EQUALS);

public static String format(final Map<String, String> map) {
public static String format(final Map<String, String> map, final boolean sortByKey) {
final List<String> keys = new ArrayList<>(map.keySet());
Collections.sort(keys);
if (sortByKey) {
Collections.sort(keys);
}

final StringBuilder sb = new StringBuilder();
for (final String key : keys) {
Expand All @@ -31,7 +34,7 @@ public static String format(final Map<String, String> map) {
sb.append(COMMA);
}

if (sb.length() > 0) {
if (!sb.isEmpty()) {
sb.setLength(sb.length() - 1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
Expand Down Expand Up @@ -99,6 +101,7 @@ public class HTTPAppender extends AbstractAppender {

private static final Set<String> VALID_REQUEST_METHODS = Set.of(
"GET", "POST", "HEAD", "OPTIONS", "PUT", "DELETE", "TRACE");
private static final String META_KEYS_DEFAULT = "guid,receiptid,feed,system,environment,remotehost,remoteaddress";

private final MetaDataHolder metaDataHolder;
private final TempDirProvider tempDirProvider;
Expand All @@ -109,7 +112,7 @@ public class HTTPAppender extends AbstractAppender {
private Long connectionTimeout;
private Long readTimeout;
private Long forwardChunkSize;
private Set<String> metaKeySet = getMetaKeySet("guid,feed,system,environment,remotehost,remoteaddress");
private Set<String> metaKeySet = getMetaKeySet(META_KEYS_DEFAULT);

private final OutputFactory outputStreamSupport;
private long startTimeMs;
Expand Down Expand Up @@ -541,11 +544,10 @@ private void log(final Logger logger,
final int responseCode,
final long bytes,
final long duration) {
if (logger.isInfoEnabled() && !metaKeySet.isEmpty()) {
final Map<String, String> filteredMap = attributeMap.entrySet().stream()
.filter(entry -> metaKeySet.contains(entry.getKey().toLowerCase()))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
final String kvPairs = CSVFormatter.format(filteredMap);

if (logger.isInfoEnabled()) {
final Map<String, String> filteredMap = filterAttributes(attributeMap);
final String kvPairs = CSVFormatter.format(filteredMap, false);
final String message = CSVFormatter.escape(type) +
"," +
CSVFormatter.escape(url) +
Expand All @@ -561,13 +563,29 @@ private void log(final Logger logger,
}
}

public Map<String, String> filterAttributes(final AttributeMap attributeMap) {
// Use a LinkedHashMap to adhere to metaKeySet order, which is a LinkedHashSet
if (NullSafe.hasItems(metaKeySet)) {
final Map<String, String> map = new LinkedHashMap<>(metaKeySet.size());
metaKeySet.forEach(key ->
map.put(key, attributeMap.get(key)));
return map;
} else {
return Collections.emptyMap();
}
}

private Set<String> getMetaKeySet(final String csv) {
if (NullSafe.isEmptyString(csv)) {
return Collections.emptySet();
} else {
// Use LinkedHashSet to preserve order
return Arrays.stream(csv.split(","))
.map(String::trim)
.filter(NullSafe::isNonBlankString)
.map(String::toLowerCase)
.collect(Collectors.toCollection(LinkedHashSet::new));
}

return Arrays.stream(csv.toLowerCase().split(","))
.collect(Collectors.toSet());
}

private static Set<String> getActiveSSLProtocols() {
Expand Down Expand Up @@ -677,7 +695,7 @@ public void setCompressionMethod(final String compressionMethod) {
@PipelineProperty(
description = "Specifies Which meta data keys will have their values logged in the send log. A Comma " +
"delimited string of keys.",
defaultValue = "guid,feed,system,environment,remotehost,remoteaddress",
defaultValue = META_KEYS_DEFAULT,
displayPriority = 12)
public void setLogMetaKeys(final String string) {
metaKeySet = getMetaKeySet(string);
Expand Down
11 changes: 0 additions & 11 deletions stroom-proxy/stroom-proxy-app/proxy-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,6 @@ proxyConfig:
maxRetries: 3
apiKey: ""

logStream:
metaKeys:
- "guid"
- "feed"
- "system"
- "environment"
- "remotehost"
- "remoteaddress"
- "remotedn"
- "remotecertexpiry"

contentDir: "content"
# contentSync:
# contentSyncEnabled: false
Expand Down
1 change: 1 addition & 0 deletions stroom-proxy/stroom-proxy-app/proxy-prod.yml.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ proxyConfig:
logStream:
metaKeys:
- "guid"
- "receiptid"
- "feed"
- "system"
- "environment"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import stroom.proxy.app.event.EventStore;
import stroom.proxy.app.event.EventStoreConfig;
import stroom.proxy.app.handler.ReceiptIdGenerator;
import stroom.proxy.repo.ProxyServices;
import stroom.receive.common.ReceiptIdGenerator;

import io.dropwizard.lifecycle.Managed;
import jakarta.inject.Inject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import stroom.meta.api.AttributeMap;
import stroom.meta.api.StandardHeaderArguments;
import stroom.proxy.app.event.EventStore;
import stroom.proxy.app.handler.ReceiptIdGenerator;
import stroom.receive.common.ReceiptIdGenerator;
import stroom.util.NullSafe;
import stroom.util.concurrent.UniqueIdGenerator.UniqueId;
import stroom.util.concurrent.UniqueId;
import stroom.util.logging.LambdaLogger;
import stroom.util.logging.LambdaLoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package stroom.proxy.app.event;

import stroom.meta.api.AttributeMap;
import stroom.util.concurrent.UniqueIdGenerator.UniqueId;
import stroom.util.concurrent.UniqueId;

public interface EventConsumer {

Expand Down
Loading

0 comments on commit aef8eff

Please sign in to comment.