From aef8effa08210a2938574363af54acd9010dd525 Mon Sep 17 00:00:00 2001 From: at055612 <22818309+at055612@users.noreply.github.com> Date: Tue, 14 Jan 2025 18:06:27 +0000 Subject: [PATCH] Change datafeed req logging, change receiptId format, fix gh-4695 --- .../core/receive/ReceiveDataModule.java | 2 + .../receive/ReceiveDataRequestHandler.java | 21 +- .../receive/StroomReceiptIdGenerator.java | 55 +++++ .../store/impl/DataUploadTaskHandler.java | 13 +- .../meta/api/StandardHeaderArguments.java | 2 + .../java/stroom/node/impl/NodeConfig.java | 11 +- .../stroom/pipeline/writer/CSVFormatter.java | 9 +- .../stroom/pipeline/writer/HTTPAppender.java | 38 +++- stroom-proxy/stroom-proxy-app/proxy-dev.yml | 11 - .../stroom-proxy-app/proxy-prod.yml.jinja2 | 1 + .../java/stroom/proxy/app/ProxyLifecycle.java | 2 +- .../java/stroom/proxy/app/SqsConnector.java | 4 +- .../stroom/proxy/app/event/EventConsumer.java | 2 +- .../proxy/app/event/EventResourceImpl.java | 2 +- .../proxy/app/event/EventSerialiser.java | 4 +- .../stroom/proxy/app/event/EventStore.java | 2 +- .../proxy/app/event/KafkaEventConsumer.java | 2 +- .../proxy/app/event/ReceiveDataHelper.java | 11 +- .../proxy/app/guice/ProxyCoreModule.java | 3 + .../proxy/app/handler/DropReceiver.java | 2 +- .../app/handler/ForwardHttpPostConfig.java | 1 + .../handler/ForwardHttpPostDestination.java | 4 +- .../proxy/app/handler/InstantForwardFile.java | 8 + .../stroom/proxy/app/handler/ProxyId.java | 13 +- ...ator.java => ProxyReceiptIdGenerator.java} | 22 +- .../app/handler/ProxyRequestHandler.java | 13 +- .../stroom/proxy/app/MockFileDestination.java | 4 +- .../stroom/proxy/app/MockHttpDestination.java | 2 +- .../java/stroom/proxy/app/PostDataHelper.java | 2 +- .../proxy/app/event/TestEventSerialiser.java | 6 +- .../proxy/app/event/TestEventStore.java | 6 +- .../stroom/proxy/app/handler/TestProxyId.java | 24 +++ .../java/stroom/proxy/repo/CSVFormatter.java | 8 +- .../java/stroom/proxy/repo/LogStream.java | 57 ++--- .../stroom/proxy/repo/LogStreamConfig.java | 46 ++-- .../receive/common/ReceiptIdGenerator.java | 11 + .../java/stroom/util/concurrent/UniqueId.java | 197 ++++++++++++++++++ .../util/concurrent/UniqueIdGenerator.java | 136 ++---------- .../stroom/util/concurrent/TestUniqueId.java | 46 ++++ .../concurrent/TestUniqueIdGenerator.java | 16 +- unreleased_changes/20250114_171914_349__0.md | 19 ++ unreleased_changes/20250114_172314_625__0.md | 19 ++ unreleased_changes/20250114_172612_413__0.md | 19 ++ .../20250114_175325_302__4695.md | 24 +++ unreleased_changes/20250114_175416_442__0.md | 19 ++ 45 files changed, 667 insertions(+), 252 deletions(-) create mode 100644 stroom-core/src/main/java/stroom/core/receive/StroomReceiptIdGenerator.java rename stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/{ReceiptIdGenerator.java => ProxyReceiptIdGenerator.java} (55%) create mode 100644 stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/handler/TestProxyId.java create mode 100644 stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/ReceiptIdGenerator.java create mode 100644 stroom-util/src/main/java/stroom/util/concurrent/UniqueId.java create mode 100644 stroom-util/src/test/java/stroom/util/concurrent/TestUniqueId.java create mode 100644 unreleased_changes/20250114_171914_349__0.md create mode 100644 unreleased_changes/20250114_172314_625__0.md create mode 100644 unreleased_changes/20250114_172612_413__0.md create mode 100644 unreleased_changes/20250114_175325_302__4695.md create mode 100644 unreleased_changes/20250114_175416_442__0.md diff --git a/stroom-core/src/main/java/stroom/core/receive/ReceiveDataModule.java b/stroom-core/src/main/java/stroom/core/receive/ReceiveDataModule.java index e031b10c255..d4bea8ea866 100644 --- a/stroom-core/src/main/java/stroom/core/receive/ReceiveDataModule.java +++ b/stroom-core/src/main/java/stroom/core/receive/ReceiveDataModule.java @@ -17,6 +17,7 @@ package stroom.core.receive; import stroom.receive.common.FeedStatusService; +import stroom.receive.common.ReceiptIdGenerator; import stroom.receive.common.RequestHandler; import com.google.inject.AbstractModule; @@ -27,5 +28,6 @@ public class ReceiveDataModule extends AbstractModule { protected void configure() { bind(RequestHandler.class).to(ReceiveDataRequestHandler.class); bind(FeedStatusService.class).to(FeedStatusServiceImpl.class); + bind(ReceiptIdGenerator.class).to(StroomReceiptIdGenerator.class).asEagerSingleton(); } } diff --git a/stroom-core/src/main/java/stroom/core/receive/ReceiveDataRequestHandler.java b/stroom-core/src/main/java/stroom/core/receive/ReceiveDataRequestHandler.java index ccbbc177f1f..84a6a77bb03 100755 --- a/stroom-core/src/main/java/stroom/core/receive/ReceiveDataRequestHandler.java +++ b/stroom-core/src/main/java/stroom/core/receive/ReceiveDataRequestHandler.java @@ -24,6 +24,7 @@ import stroom.proxy.StroomStatusCode; import stroom.receive.common.AttributeMapFilter; import stroom.receive.common.AttributeMapValidator; +import stroom.receive.common.ReceiptIdGenerator; import stroom.receive.common.RequestAuthenticator; import stroom.receive.common.RequestHandler; import stroom.receive.common.StreamTargetStreamHandlers; @@ -44,6 +45,7 @@ import java.io.IOException; import java.io.InputStream; +import java.io.PrintWriter; import java.time.Instant; import java.util.List; import java.util.function.Consumer; @@ -64,6 +66,7 @@ class ReceiveDataRequestHandler implements RequestHandler { private final MetaService metaService; private final RequestAuthenticator requestAuthenticator; private final CertificateExtractor certificateExtractor; + private final ReceiptIdGenerator receiptIdGenerator; @Inject public ReceiveDataRequestHandler(final SecurityContext securityContext, @@ -72,7 +75,8 @@ public ReceiveDataRequestHandler(final SecurityContext securityContext, final TaskContextFactory taskContextFactory, final MetaService metaService, final RequestAuthenticator requestAuthenticator, - final CertificateExtractor certificateExtractor) { + final CertificateExtractor certificateExtractor, + final ReceiptIdGenerator receiptIdGenerator) { this.securityContext = securityContext; this.attributeMapFilterFactory = attributeMapFilterFactory; this.streamTargetStreamHandlerProvider = streamTargetStreamHandlerProvider; @@ -80,6 +84,7 @@ public ReceiveDataRequestHandler(final SecurityContext securityContext, this.metaService = metaService; this.requestAuthenticator = requestAuthenticator; this.certificateExtractor = certificateExtractor; + this.receiptIdGenerator = receiptIdGenerator; } @Override @@ -96,6 +101,12 @@ public void handle(final HttpServletRequest request, final HttpServletResponse r // Validate the supplied attributes. AttributeMapValidator.validate(attributeMap, metaService::getTypes); + // Create a new receiptId for the request, so we can track progress and report back the + // receiptId to the sender + final String receiptId = receiptIdGenerator.generateId().toString(); + attributeMap.put(StandardHeaderArguments.RECEIPT_ID, receiptId); + attributeMap.appendItem(StandardHeaderArguments.RECEIPT_ID_PATH, receiptId); + final String feedName; if (attributeMapFilter.filter(attributeMap)) { debug("Receiving data", attributeMap); @@ -130,6 +141,14 @@ public void handle(final HttpServletRequest request, final HttpServletResponse r // Set the response status. final StroomStatusCode stroomStatusCode = StroomStatusCode.OK; response.setStatus(stroomStatusCode.getHttpCode()); + + LOGGER.debug(() -> "Writing receipt id attribute to response: " + receiptId); + try (final PrintWriter writer = response.getWriter()) { + writer.println(receiptId); + } catch (final IOException e) { + LOGGER.error(e.getMessage(), e); + } + logSuccess(new StroomStreamStatus(stroomStatusCode, attributeMap)); }); } diff --git a/stroom-core/src/main/java/stroom/core/receive/StroomReceiptIdGenerator.java b/stroom-core/src/main/java/stroom/core/receive/StroomReceiptIdGenerator.java new file mode 100644 index 00000000000..812f62e4e1c --- /dev/null +++ b/stroom-core/src/main/java/stroom/core/receive/StroomReceiptIdGenerator.java @@ -0,0 +1,55 @@ +package stroom.core.receive; + +import stroom.node.api.NodeInfo; +import stroom.receive.common.ReceiptIdGenerator; +import stroom.util.NullSafe; +import stroom.util.concurrent.UniqueId; +import stroom.util.concurrent.UniqueId.NodeType; +import stroom.util.concurrent.UniqueIdGenerator; +import stroom.util.logging.LambdaLogger; +import stroom.util.logging.LambdaLoggerFactory; + +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +import java.util.Objects; + +@Singleton +public class StroomReceiptIdGenerator implements ReceiptIdGenerator { + + private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(StroomReceiptIdGenerator.class); + private static final String UNSAFE_CHARS_REGEX = "[^A-Za-z0-9-]"; + + private static final NodeType NODE_TYPE = NodeType.STROOM; + + private final UniqueIdGenerator uniqueIdGenerator; + + private final String nodeId; + + @Inject + public StroomReceiptIdGenerator(final NodeInfo nodeInfo) { + final String nodeName = nodeInfo.getThisNodeName(); + if (NullSafe.isBlankString(nodeName)) { + throw new IllegalArgumentException("nodeName is blank"); + } + this.nodeId = createSafeString(nodeName); + if (!Objects.equals(nodeName, nodeId)) { + LOGGER.info("Using nodeId '{}' for receiptId meta attribute as derived from nodeName '{}'", + nodeId, nodeName); + } + this.uniqueIdGenerator = new UniqueIdGenerator(NODE_TYPE, nodeId); + } + + @Override + public UniqueId generateId() { + return uniqueIdGenerator.generateId(); + } + + static String createSafeString(final String in) { + if (in == null) { + return null; + } else { + return in.replaceAll(UNSAFE_CHARS_REGEX, "-"); + } + } +} diff --git a/stroom-data/stroom-data-store-impl/src/main/java/stroom/data/store/impl/DataUploadTaskHandler.java b/stroom-data/stroom-data-store-impl/src/main/java/stroom/data/store/impl/DataUploadTaskHandler.java index 0b273248b75..139348a512d 100644 --- a/stroom-data/stroom-data-store-impl/src/main/java/stroom/data/store/impl/DataUploadTaskHandler.java +++ b/stroom-data/stroom-data-store-impl/src/main/java/stroom/data/store/impl/DataUploadTaskHandler.java @@ -20,6 +20,7 @@ import stroom.meta.api.AttributeMap; import stroom.meta.api.AttributeMapUtil; import stroom.meta.api.StandardHeaderArguments; +import stroom.receive.common.ReceiptIdGenerator; import stroom.receive.common.StreamTargetStreamHandlers; import stroom.receive.common.StroomStreamProcessor; import stroom.security.api.SecurityContext; @@ -52,14 +53,17 @@ public class DataUploadTaskHandler { private final TaskContextFactory taskContextFactory; private final SecurityContext securityContext; private final StreamTargetStreamHandlers streamHandlers; + private final ReceiptIdGenerator receiptIdGenerator; @Inject DataUploadTaskHandler(final TaskContextFactory taskContextFactory, final SecurityContext securityContext, - final StreamTargetStreamHandlers streamHandlers) { + final StreamTargetStreamHandlers streamHandlers, + final ReceiptIdGenerator receiptIdGenerator) { this.taskContextFactory = taskContextFactory; this.securityContext = securityContext; this.streamHandlers = streamHandlers; + this.receiptIdGenerator = receiptIdGenerator; } public void uploadData(final String fileName, @@ -114,6 +118,11 @@ private void uploadData(final TaskContext taskContext, attributeMap.putDateTime(StandardHeaderArguments.RECEIVED_TIME_HISTORY, receivedTime); attributeMap.put(StandardHeaderArguments.USER_AGENT, "STROOM-UI"); attributeMap.put("UploadedBy", securityContext.getUserIdentityForAudit()); + // Create a new receiptId for the request, so we can track progress and report back the + // receiptId to the sender + final String receiptId = receiptIdGenerator.generateId().toString(); + attributeMap.put(StandardHeaderArguments.RECEIPT_ID, receiptId); + attributeMap.appendItem(StandardHeaderArguments.RECEIPT_ID_PATH, receiptId); final Consumer progressHandler = new TaskProgressHandler(taskContext, "Uploading"); @@ -122,7 +131,7 @@ private void uploadData(final TaskContext taskContext, uploadZipFile(file, feedName, typeName, attributeMap, progressHandler); } else if (name.endsWith(FILE_SEPARATOR + StandardHeaderArguments.COMPRESSION_GZIP) || - name.endsWith(FILE_SEPARATOR + GZ)) { + name.endsWith(FILE_SEPARATOR + GZ)) { attributeMap.put(StandardHeaderArguments.COMPRESSION, StandardHeaderArguments.COMPRESSION_GZIP); uploadStreamFile(file, feedName, typeName, attributeMap, progressHandler); diff --git a/stroom-meta/stroom-meta-api/src/main/java/stroom/meta/api/StandardHeaderArguments.java b/stroom-meta/stroom-meta-api/src/main/java/stroom/meta/api/StandardHeaderArguments.java index 752493a47fb..75fe71035a0 100644 --- a/stroom-meta/stroom-meta-api/src/main/java/stroom/meta/api/StandardHeaderArguments.java +++ b/stroom-meta/stroom-meta-api/src/main/java/stroom/meta/api/StandardHeaderArguments.java @@ -73,6 +73,8 @@ public interface StandardHeaderArguments { String FEED = "Feed"; String TYPE = "Type"; + String SYSTEM = "System"; + String ENVIRONMENT = "Environment"; // Typically added in by nginx String X_FORWARDED_FOR = "X-Forwarded-For"; diff --git a/stroom-node/stroom-node-impl/src/main/java/stroom/node/impl/NodeConfig.java b/stroom-node/stroom-node-impl/src/main/java/stroom/node/impl/NodeConfig.java index 29d9f9adac4..af432acd501 100644 --- a/stroom-node/stroom-node-impl/src/main/java/stroom/node/impl/NodeConfig.java +++ b/stroom-node/stroom-node-impl/src/main/java/stroom/node/impl/NodeConfig.java @@ -48,9 +48,12 @@ public NodeDbConfig getDbConfig() { @NotNull @ReadOnly - @JsonPropertyDescription("The name of the node to identify it in the cluster. " + + @JsonPropertyDescription( + "The name of the node to identify it in the cluster. " + "Should only be set per node in the application YAML config file. The node name should not " + - "be changed once set.") + "be changed once set. This node name will be used in the 'receiptId' meta attribute so " + + "if a stroom cluster is forwarding to another stroom cluster, then the " + + "node name should be unique across all clusters involved.") @JsonProperty(PROP_NAME_NAME) public String getNodeName() { return nodeName; @@ -64,8 +67,8 @@ public StatusConfig getStatusConfig() { @Override public String toString() { return "NodeConfig{" + - "nodeName='" + nodeName + '\'' + - '}'; + "nodeName='" + nodeName + '\'' + + '}'; } @BootStrapConfig diff --git a/stroom-pipeline/src/main/java/stroom/pipeline/writer/CSVFormatter.java b/stroom-pipeline/src/main/java/stroom/pipeline/writer/CSVFormatter.java index 09806ed2582..6c7bf9a3b57 100644 --- a/stroom-pipeline/src/main/java/stroom/pipeline/writer/CSVFormatter.java +++ b/stroom-pipeline/src/main/java/stroom/pipeline/writer/CSVFormatter.java @@ -7,6 +7,7 @@ import java.util.regex.Pattern; public class CSVFormatter { + private static final String COMMA = ","; private static final String QUOTE = "\""; private static final String DOUBLE_QUOTE = "\"\""; @@ -15,9 +16,11 @@ public class CSVFormatter { private static final String ESCAPED_EQUALS = "\\="; private static final Pattern EQUALS_PATTERN = Pattern.compile(EQUALS); - public static String format(final Map map) { + public static String format(final Map map, final boolean sortByKey) { final List keys = new ArrayList<>(map.keySet()); - Collections.sort(keys); + if (sortByKey) { + Collections.sort(keys); + } final StringBuilder sb = new StringBuilder(); for (final String key : keys) { @@ -31,7 +34,7 @@ public static String format(final Map map) { sb.append(COMMA); } - if (sb.length() > 0) { + if (!sb.isEmpty()) { sb.setLength(sb.length() - 1); } diff --git a/stroom-pipeline/src/main/java/stroom/pipeline/writer/HTTPAppender.java b/stroom-pipeline/src/main/java/stroom/pipeline/writer/HTTPAppender.java index 0180fe6d83c..37d1f4cad33 100644 --- a/stroom-pipeline/src/main/java/stroom/pipeline/writer/HTTPAppender.java +++ b/stroom-pipeline/src/main/java/stroom/pipeline/writer/HTTPAppender.java @@ -48,6 +48,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; @@ -99,6 +101,7 @@ public class HTTPAppender extends AbstractAppender { private static final Set VALID_REQUEST_METHODS = Set.of( "GET", "POST", "HEAD", "OPTIONS", "PUT", "DELETE", "TRACE"); + private static final String META_KEYS_DEFAULT = "guid,receiptid,feed,system,environment,remotehost,remoteaddress"; private final MetaDataHolder metaDataHolder; private final TempDirProvider tempDirProvider; @@ -109,7 +112,7 @@ public class HTTPAppender extends AbstractAppender { private Long connectionTimeout; private Long readTimeout; private Long forwardChunkSize; - private Set metaKeySet = getMetaKeySet("guid,feed,system,environment,remotehost,remoteaddress"); + private Set metaKeySet = getMetaKeySet(META_KEYS_DEFAULT); private final OutputFactory outputStreamSupport; private long startTimeMs; @@ -541,11 +544,10 @@ private void log(final Logger logger, final int responseCode, final long bytes, final long duration) { - if (logger.isInfoEnabled() && !metaKeySet.isEmpty()) { - final Map filteredMap = attributeMap.entrySet().stream() - .filter(entry -> metaKeySet.contains(entry.getKey().toLowerCase())) - .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); - final String kvPairs = CSVFormatter.format(filteredMap); + + if (logger.isInfoEnabled()) { + final Map filteredMap = filterAttributes(attributeMap); + final String kvPairs = CSVFormatter.format(filteredMap, false); final String message = CSVFormatter.escape(type) + "," + CSVFormatter.escape(url) + @@ -561,13 +563,29 @@ private void log(final Logger logger, } } + public Map filterAttributes(final AttributeMap attributeMap) { + // Use a LinkedHashMap to adhere to metaKeySet order, which is a LinkedHashSet + if (NullSafe.hasItems(metaKeySet)) { + final Map map = new LinkedHashMap<>(metaKeySet.size()); + metaKeySet.forEach(key -> + map.put(key, attributeMap.get(key))); + return map; + } else { + return Collections.emptyMap(); + } + } + private Set getMetaKeySet(final String csv) { if (NullSafe.isEmptyString(csv)) { return Collections.emptySet(); + } else { + // Use LinkedHashSet to preserve order + return Arrays.stream(csv.split(",")) + .map(String::trim) + .filter(NullSafe::isNonBlankString) + .map(String::toLowerCase) + .collect(Collectors.toCollection(LinkedHashSet::new)); } - - return Arrays.stream(csv.toLowerCase().split(",")) - .collect(Collectors.toSet()); } private static Set getActiveSSLProtocols() { @@ -677,7 +695,7 @@ public void setCompressionMethod(final String compressionMethod) { @PipelineProperty( description = "Specifies Which meta data keys will have their values logged in the send log. A Comma " + "delimited string of keys.", - defaultValue = "guid,feed,system,environment,remotehost,remoteaddress", + defaultValue = META_KEYS_DEFAULT, displayPriority = 12) public void setLogMetaKeys(final String string) { metaKeySet = getMetaKeySet(string); diff --git a/stroom-proxy/stroom-proxy-app/proxy-dev.yml b/stroom-proxy/stroom-proxy-app/proxy-dev.yml index 1cab2483842..0acd9cc7737 100644 --- a/stroom-proxy/stroom-proxy-app/proxy-dev.yml +++ b/stroom-proxy/stroom-proxy-app/proxy-dev.yml @@ -131,17 +131,6 @@ proxyConfig: maxRetries: 3 apiKey: "" - logStream: - metaKeys: - - "guid" - - "feed" - - "system" - - "environment" - - "remotehost" - - "remoteaddress" - - "remotedn" - - "remotecertexpiry" - contentDir: "content" # contentSync: # contentSyncEnabled: false diff --git a/stroom-proxy/stroom-proxy-app/proxy-prod.yml.jinja2 b/stroom-proxy/stroom-proxy-app/proxy-prod.yml.jinja2 index b1d37fc176a..b44d0bca51c 100644 --- a/stroom-proxy/stroom-proxy-app/proxy-prod.yml.jinja2 +++ b/stroom-proxy/stroom-proxy-app/proxy-prod.yml.jinja2 @@ -186,6 +186,7 @@ proxyConfig: logStream: metaKeys: - "guid" + - "receiptid" - "feed" - "system" - "environment" diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/ProxyLifecycle.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/ProxyLifecycle.java index b8b6859d250..1bc22f79ee4 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/ProxyLifecycle.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/ProxyLifecycle.java @@ -2,8 +2,8 @@ import stroom.proxy.app.event.EventStore; import stroom.proxy.app.event.EventStoreConfig; -import stroom.proxy.app.handler.ReceiptIdGenerator; import stroom.proxy.repo.ProxyServices; +import stroom.receive.common.ReceiptIdGenerator; import io.dropwizard.lifecycle.Managed; import jakarta.inject.Inject; diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/SqsConnector.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/SqsConnector.java index ded02e40cbb..22d04833798 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/SqsConnector.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/SqsConnector.java @@ -3,9 +3,9 @@ import stroom.meta.api.AttributeMap; import stroom.meta.api.StandardHeaderArguments; import stroom.proxy.app.event.EventStore; -import stroom.proxy.app.handler.ReceiptIdGenerator; +import stroom.receive.common.ReceiptIdGenerator; import stroom.util.NullSafe; -import stroom.util.concurrent.UniqueIdGenerator.UniqueId; +import stroom.util.concurrent.UniqueId; import stroom.util.logging.LambdaLogger; import stroom.util.logging.LambdaLoggerFactory; diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/EventConsumer.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/EventConsumer.java index 6b89f5a87db..9fb495dba3c 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/EventConsumer.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/EventConsumer.java @@ -1,7 +1,7 @@ package stroom.proxy.app.event; import stroom.meta.api.AttributeMap; -import stroom.util.concurrent.UniqueIdGenerator.UniqueId; +import stroom.util.concurrent.UniqueId; public interface EventConsumer { diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/EventResourceImpl.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/EventResourceImpl.java index 5e14f2f43c2..305fa6a2dba 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/EventResourceImpl.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/EventResourceImpl.java @@ -17,7 +17,7 @@ package stroom.proxy.app.event; import stroom.meta.api.AttributeMap; -import stroom.util.concurrent.UniqueIdGenerator.UniqueId; +import stroom.util.concurrent.UniqueId; import jakarta.inject.Inject; import jakarta.inject.Singleton; diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/EventSerialiser.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/EventSerialiser.java index 92d4d18203b..c3ba8269643 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/EventSerialiser.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/EventSerialiser.java @@ -3,7 +3,7 @@ import stroom.meta.api.AttributeMap; import stroom.proxy.app.event.model.Event; import stroom.proxy.app.event.model.Header; -import stroom.util.concurrent.UniqueIdGenerator.UniqueId; +import stroom.util.concurrent.UniqueId; import stroom.util.date.DateUtil; import stroom.util.json.JsonUtil; @@ -30,7 +30,7 @@ public String serialise(final UniqueId receiptId, final Event event = new Event( 0, receiptId.toString(), - receiptId.nodeId(), + receiptId.getNodeId(), feedKey.feed(), feedKey.type(), DateUtil.createNormalDateTimeString(), diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/EventStore.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/EventStore.java index 139df7eef98..19b78ca0793 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/EventStore.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/EventStore.java @@ -7,7 +7,7 @@ import stroom.proxy.repo.store.FileStores; import stroom.util.concurrent.ThreadUtil; import stroom.util.concurrent.UncheckedInterruptedException; -import stroom.util.concurrent.UniqueIdGenerator.UniqueId; +import stroom.util.concurrent.UniqueId; import stroom.util.logging.LambdaLogger; import stroom.util.logging.LambdaLoggerFactory; import stroom.util.logging.Metrics; diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/KafkaEventConsumer.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/KafkaEventConsumer.java index 8c7cc1a2291..7a29ba51973 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/KafkaEventConsumer.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/KafkaEventConsumer.java @@ -2,7 +2,7 @@ import stroom.meta.api.AttributeMap; import stroom.proxy.app.ProxyConfig; -import stroom.util.concurrent.UniqueIdGenerator.UniqueId; +import stroom.util.concurrent.UniqueId; import stroom.util.logging.LambdaLogger; import stroom.util.logging.LambdaLoggerFactory; diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/ReceiveDataHelper.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/ReceiveDataHelper.java index e268cb0f373..901ef9a1b3d 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/ReceiveDataHelper.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/event/ReceiveDataHelper.java @@ -5,17 +5,18 @@ import stroom.meta.api.StandardHeaderArguments; import stroom.proxy.StroomStatusCode; import stroom.proxy.app.handler.AttributeMapFilterFactory; -import stroom.proxy.app.handler.ReceiptIdGenerator; import stroom.proxy.repo.CSVFormatter; import stroom.proxy.repo.LogStream; import stroom.receive.common.AttributeMapFilter; +import stroom.receive.common.ReceiptIdGenerator; import stroom.receive.common.RequestAuthenticator; import stroom.receive.common.StroomStreamException; import stroom.receive.common.StroomStreamStatus; import stroom.util.cert.CertificateExtractor; -import stroom.util.concurrent.UniqueIdGenerator.UniqueId; +import stroom.util.concurrent.UniqueId; import stroom.util.logging.LambdaLogger; import stroom.util.logging.LambdaLoggerFactory; +import stroom.util.logging.LogUtil; import stroom.util.logging.Metrics; import jakarta.inject.Inject; @@ -91,9 +92,9 @@ public UniqueId process(final HttpServletRequest request, e, AttributeMapUtil.create(request, certificateExtractor)); final StroomStreamStatus status = stroomStreamException.getStroomStreamStatus(); - LOGGER.debug("\"handleException()\",{},\"{}\"", - CSVFormatter.format(status.getAttributeMap()), - CSVFormatter.escape(stroomStreamException.getMessage())); + LOGGER.debug(() -> LogUtil.message("\"handleException()\",{},\"{}\"", + CSVFormatter.format(status.getAttributeMap(), true), + CSVFormatter.escape(stroomStreamException.getMessage()))); final long duration = System.currentTimeMillis() - startTimeMs; if (StroomStatusCode.FEED_IS_NOT_SET_TO_RECEIVE_DATA.equals(status.getStroomStatusCode())) { diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/guice/ProxyCoreModule.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/guice/ProxyCoreModule.java index 5b88e265ca4..c9fcf113921 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/guice/ProxyCoreModule.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/guice/ProxyCoreModule.java @@ -17,6 +17,7 @@ import stroom.proxy.app.DataDirProviderImpl; import stroom.proxy.app.ProxyConfig; import stroom.proxy.app.handler.ProxyId; +import stroom.proxy.app.handler.ProxyReceiptIdGenerator; import stroom.proxy.app.handler.ProxyRequestHandler; import stroom.proxy.app.handler.ReceiverFactory; import stroom.proxy.app.handler.ReceiverFactoryProvider; @@ -27,6 +28,7 @@ import stroom.proxy.repo.ProgressLogImpl; import stroom.receive.common.DataReceiptPolicyAttributeMapFilterFactory; import stroom.receive.common.FeedStatusService; +import stroom.receive.common.ReceiptIdGenerator; import stroom.receive.common.RemoteFeedModule; import stroom.receive.common.RequestHandler; import stroom.receive.rules.impl.DataReceiptPolicyAttributeMapFilterFactoryImpl; @@ -60,6 +62,7 @@ protected void configure() { install(new MockCollectionModule()); bind(ProxyId.class).asEagerSingleton(); + bind(ReceiptIdGenerator.class).to(ProxyReceiptIdGenerator.class).asEagerSingleton(); bind(BuildInfo.class).toProvider(BuildInfoProvider.class); bind(HttpClientFactory.class).to(DropwizardHttpClientFactory.class); bind(DataReceiptPolicyAttributeMapFilterFactory.class).to(DataReceiptPolicyAttributeMapFilterFactoryImpl.class); diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/DropReceiver.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/DropReceiver.java index 32c7cbd237f..da3fd4f65f8 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/DropReceiver.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/DropReceiver.java @@ -46,7 +46,7 @@ public void receive(final Instant startTime, } } - LOGGER.warn("\"Dropped\",{}", CSVFormatter.format(attributeMap)); + LOGGER.warn("\"Dropped\",{}", CSVFormatter.format(attributeMap, true)); final Duration duration = Duration.between(startTime, Instant.now()); logStream.log( diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ForwardHttpPostConfig.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ForwardHttpPostConfig.java index d5d37ff8eb7..48df537fe96 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ForwardHttpPostConfig.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ForwardHttpPostConfig.java @@ -76,6 +76,7 @@ public ForwardHttpPostConfig(@JsonProperty("enabled") final boolean enabled, this.addOpenIdAccessToken = addOpenIdAccessToken; this.httpClient = Objects.requireNonNullElse(httpClient, HttpClientConfiguration .builder() + .timeout(DEFAULT_FORWARD_TIMEOUT) .connectionTimeout(DEFAULT_FORWARD_TIMEOUT) .connectionRequestTimeout(DEFAULT_FORWARD_TIMEOUT) .timeToLive(DEFAULT_FORWARD_TIMEOUT) diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ForwardHttpPostDestination.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ForwardHttpPostDestination.java index 6d75555ec76..1d907966d55 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ForwardHttpPostDestination.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ForwardHttpPostDestination.java @@ -88,6 +88,7 @@ public void add(final Path sourceDir) { } private boolean forwardDir(final Path dir) { + LOGGER.debug("ForwardDir() - destinationName: {}, dir: {}", destinationName, dir); try { try { final FileGroup fileGroup = new FileGroup(dir); @@ -110,7 +111,8 @@ private boolean forwardDir(final Path dir) { } catch (final Exception e) { LOGGER.error(() -> - "Error sending '" + FileUtil.getCanonicalPath(dir) + "' to '" + destinationName + "'."); + "Error sending '" + FileUtil.getCanonicalPath(dir) + + "' to '" + destinationName + "'."); LOGGER.debug(e::getMessage, e); // Add to the errors diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/InstantForwardFile.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/InstantForwardFile.java index dd0360cecef..b81eedba66f 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/InstantForwardFile.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/InstantForwardFile.java @@ -71,6 +71,10 @@ public ReceiverFactory get(final ForwardFileConfig forwardFileConfig) { dropReceiver); } + + // -------------------------------------------------------------------------------- + + private static class InstantForwardFileReceiverFactory implements ReceiverFactory { private final AttributeMapFilter attributeMapFilter; @@ -95,6 +99,10 @@ public Receiver get(final AttributeMap attributeMap) { } } + + // -------------------------------------------------------------------------------- + + private static class InstantForwardFileReceiver implements Receiver { private final NumberedDirProvider receivingDirProvider; diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ProxyId.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ProxyId.java index 6b31fe2ad32..f193136620d 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ProxyId.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ProxyId.java @@ -22,6 +22,7 @@ public class ProxyId { private static final String ALLOWED_CHARS = "A-Za-z0-9-"; public static final String PROXY_ID_REGEX = "^[" + ALLOWED_CHARS + "]+$"; + public static final Pattern PROXY_ID_PATTERN = Pattern.compile(PROXY_ID_REGEX); private static final String UNSAFE_CHARS_REGEX = "[^" + ALLOWED_CHARS + "]"; private static final Logger LOGGER = LoggerFactory.getLogger(ProxyId.class); @@ -42,11 +43,11 @@ public ProxyId(final ProxyConfig proxyConfig, if (NullSafe.isBlankString(proxyId)) { LOGGER.info("No proxy id is configured"); final String storedId = readProxyId(); - if (NullSafe.isBlankString(storedId)) { + if (NullSafe.isBlankString(storedId) || !PROXY_ID_PATTERN.matcher(storedId).matches()) { final String createdId = createProxyId(); writeProxyId(createdId); - LOGGER.info("No stored proxy ID found in '{}', created and stored new proxy id: {}", - PROXY_ID_FILE, createdId); + LOGGER.info("No or invalid stored proxy ID '{}' found in '{}', created and stored new proxy id: {}", + storedId, PROXY_ID_FILE, createdId); id = createdId; source = "generated"; } else { @@ -60,7 +61,7 @@ public ProxyId(final ProxyConfig proxyConfig, source = "config"; } - if (!Pattern.compile(PROXY_ID_REGEX).matcher(id).matches()) { + if (!PROXY_ID_PATTERN.matcher(id).matches()) { throw new RuntimeException(LogUtil.message("Proxy ID '{}' (source: {}), does not match pattern '{}'", id, source, PROXY_ID_REGEX)); } @@ -80,7 +81,7 @@ private String readProxyId() { final Path path = getProxyIdFilePath(); if (Files.exists(path)) { try { - storedId = Files.readString(path); + storedId = Files.readString(path).trim(); } catch (final IOException e) { LOGGER.error(e.getMessage(), e); } @@ -111,7 +112,7 @@ private String createProxyId() { return PROXY_ID + UUID.randomUUID(); } - private String createSafeString(final String in) { + static String createSafeString(final String in) { if (in == null) { return null; } else { diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ReceiptIdGenerator.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ProxyReceiptIdGenerator.java similarity index 55% rename from stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ReceiptIdGenerator.java rename to stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ProxyReceiptIdGenerator.java index 56c53dd1a72..2d2a75f726c 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ReceiptIdGenerator.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ProxyReceiptIdGenerator.java @@ -1,7 +1,9 @@ package stroom.proxy.app.handler; +import stroom.receive.common.ReceiptIdGenerator; +import stroom.util.concurrent.UniqueId; +import stroom.util.concurrent.UniqueId.NodeType; import stroom.util.concurrent.UniqueIdGenerator; -import stroom.util.concurrent.UniqueIdGenerator.UniqueId; import stroom.util.logging.LambdaLogger; import stroom.util.logging.LambdaLoggerFactory; @@ -11,28 +13,34 @@ import java.util.Objects; import java.util.function.Supplier; +/** + * {@link ReceiptIdGenerator} for a proxy node + */ @Singleton -public class ReceiptIdGenerator { +public class ProxyReceiptIdGenerator implements ReceiptIdGenerator { - private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(ReceiptIdGenerator.class); + private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(ProxyReceiptIdGenerator.class); + + private static final NodeType NODE_TYPE = NodeType.PROXY; private final UniqueIdGenerator receiptIdGenerator; @Inject - public ReceiptIdGenerator(final ProxyId proxyId) { + public ProxyReceiptIdGenerator(final ProxyId proxyId) { LOGGER.info("Creating receiptIdGenerator for proxyId '{}'", proxyId); - receiptIdGenerator = new UniqueIdGenerator(proxyId.getId()); + receiptIdGenerator = new UniqueIdGenerator(NODE_TYPE, proxyId.getId()); } - public ReceiptIdGenerator(final Supplier nodeIdSupplier) { + public ProxyReceiptIdGenerator(final Supplier nodeIdSupplier) { final String nodeId = Objects.requireNonNull(nodeIdSupplier).get(); LOGGER.info("Creating receiptIdGenerator for proxyId '{}'", nodeId); - receiptIdGenerator = new UniqueIdGenerator(nodeId); + receiptIdGenerator = new UniqueIdGenerator(NODE_TYPE, nodeId); } /** * @return A globally unique ID. */ + @Override public UniqueId generateId() { return receiptIdGenerator.generateId(); } diff --git a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ProxyRequestHandler.java b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ProxyRequestHandler.java index 33fab8160fa..8f86f1020c5 100644 --- a/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ProxyRequestHandler.java +++ b/stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ProxyRequestHandler.java @@ -4,11 +4,12 @@ import stroom.meta.api.AttributeMapUtil; import stroom.meta.api.StandardHeaderArguments; import stroom.proxy.StroomStatusCode; +import stroom.receive.common.ReceiptIdGenerator; import stroom.receive.common.RequestAuthenticator; import stroom.receive.common.RequestHandler; import stroom.receive.common.StroomStreamException; import stroom.util.cert.CertificateExtractor; -import stroom.util.concurrent.UniqueIdGenerator.UniqueId; +import stroom.util.concurrent.UniqueId; import stroom.util.date.DateUtil; import stroom.util.io.StreamUtil; import stroom.util.logging.LambdaLogger; @@ -61,14 +62,15 @@ public void handle(final HttpServletRequest request, final HttpServletResponse r // Create attribute map from headers. final AttributeMap attributeMap = AttributeMapUtil.create(request, certificateExtractor); - // Create a new proxy id for the request, so we can track progress and report back the UUID to the sender, + // Create a new proxy id for the request, so we can track progress of the stream + // through the various proxies and into stroom and report back the ID to the sender, final UniqueId receiptId = receiptIdGenerator.generateId(); // Authorise request. requestAuthenticator.authenticate(request, attributeMap); final String receiptIdStr = receiptId.toString(); - LOGGER.debug("Adding proxy attribute {}: {}", StandardHeaderArguments.RECEIPT_ID, receiptIdStr); + LOGGER.debug("Adding meta attribute {}: {}", StandardHeaderArguments.RECEIPT_ID, receiptIdStr); attributeMap.put(StandardHeaderArguments.RECEIPT_ID, receiptIdStr); attributeMap.appendItem(StandardHeaderArguments.RECEIPT_ID_PATH, receiptIdStr); @@ -99,13 +101,12 @@ public void handle(final HttpServletRequest request, final HttpServletResponse r response.setStatus(HttpStatus.SC_OK); - LOGGER.debug(() -> "Writing proxy receipt id attribute to response: " + receiptId); + LOGGER.debug(() -> "Writing proxy receipt id attribute to response: " + receiptIdStr); try (final PrintWriter writer = response.getWriter()) { - writer.println(receiptId); + writer.println(receiptIdStr); } catch (final IOException e) { LOGGER.error(e.getMessage(), e); } - } catch (final StroomStreamException e) { e.sendErrorResponse(response); } diff --git a/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/MockFileDestination.java b/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/MockFileDestination.java index 1473d41d708..75a341fd811 100644 --- a/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/MockFileDestination.java +++ b/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/MockFileDestination.java @@ -9,7 +9,7 @@ import stroom.proxy.repo.AggregatorConfig; import stroom.test.common.TestUtil; import stroom.util.NullSafe; -import stroom.util.concurrent.UniqueIdGenerator.UniqueId; +import stroom.util.concurrent.UniqueId; import stroom.util.date.DateUtil; import stroom.util.io.FileName; import stroom.util.io.PathCreator; @@ -164,7 +164,7 @@ private List getForwardFiles(final Config config) { } /** - * Assert all the {@link stroom.util.concurrent.UniqueIdGenerator.UniqueId}s contained in the stored aggregates + * Assert all the {@link UniqueId}s contained in the stored aggregates */ void assertReceiptIds(final Config config, final List expectedReceiptIds) { diff --git a/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/MockHttpDestination.java b/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/MockHttpDestination.java index 416e6ccb06f..12c66648052 100644 --- a/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/MockHttpDestination.java +++ b/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/MockHttpDestination.java @@ -14,7 +14,7 @@ import stroom.receive.common.ReceiveDataServlet; import stroom.test.common.TestUtil; import stroom.util.NullSafe; -import stroom.util.concurrent.UniqueIdGenerator.UniqueId; +import stroom.util.concurrent.UniqueId; import stroom.util.date.DateUtil; import stroom.util.io.ByteCountInputStream; import stroom.util.io.FileName; diff --git a/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/PostDataHelper.java b/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/PostDataHelper.java index cc14ee713f4..7d318930fbd 100644 --- a/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/PostDataHelper.java +++ b/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/PostDataHelper.java @@ -3,7 +3,7 @@ import stroom.proxy.app.handler.LocalByteBuffer; import stroom.proxy.app.handler.NumericFileNameUtil; import stroom.proxy.app.handler.ZipWriter; -import stroom.util.concurrent.UniqueIdGenerator.UniqueId; +import stroom.util.concurrent.UniqueId; import com.google.common.base.Strings; import jakarta.ws.rs.client.Client; diff --git a/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/event/TestEventSerialiser.java b/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/event/TestEventSerialiser.java index 5050622c013..a1b2dc80bdb 100644 --- a/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/event/TestEventSerialiser.java +++ b/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/event/TestEventSerialiser.java @@ -2,8 +2,8 @@ import stroom.meta.api.AttributeMap; import stroom.meta.api.StandardHeaderArguments; -import stroom.proxy.app.handler.ReceiptIdGenerator; -import stroom.util.concurrent.UniqueIdGenerator.UniqueId; +import stroom.proxy.app.handler.ProxyReceiptIdGenerator; +import stroom.util.concurrent.UniqueId; import org.junit.jupiter.api.Test; @@ -20,7 +20,7 @@ void test() throws IOException { attributeMap.put(StandardHeaderArguments.FEED, "test-feed"); final String data = "this\nis some data \n with new \n\n lines"; final EventSerialiser eventSerialiser = new EventSerialiser(); - final UniqueId receiptId = new ReceiptIdGenerator(() -> "test-proxy").generateId(); + final UniqueId receiptId = new ProxyReceiptIdGenerator(() -> "test-proxy").generateId(); final String json = eventSerialiser.serialise(receiptId, feedKey, attributeMap, data); assertThat(json).contains("\"this\\nis some data \\n with new \\n\\n lines\""); diff --git a/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/event/TestEventStore.java b/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/event/TestEventStore.java index 6f438d83d90..0485b0c54e4 100644 --- a/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/event/TestEventStore.java +++ b/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/event/TestEventStore.java @@ -4,7 +4,8 @@ import stroom.proxy.app.DataDirProvider; import stroom.proxy.app.handler.ReceiverFactory; import stroom.proxy.repo.store.FileStores; -import stroom.util.concurrent.UniqueIdGenerator.UniqueId; +import stroom.util.concurrent.UniqueId; +import stroom.util.concurrent.UniqueId.NodeType; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -38,7 +39,8 @@ void test() throws IOException { attributeMap.put("Type", feedKey.type()); final UniqueId receiptId = new UniqueId( Instant.now().toEpochMilli(), - (short) i, + i, + NodeType.PROXY, "test-proxy"); eventStore.consume(attributeMap, receiptId, "test"); diff --git a/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/handler/TestProxyId.java b/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/handler/TestProxyId.java new file mode 100644 index 00000000000..3b80fd46ae9 --- /dev/null +++ b/stroom-proxy/stroom-proxy-app/src/test/java/stroom/proxy/app/handler/TestProxyId.java @@ -0,0 +1,24 @@ +package stroom.proxy.app.handler; + +import stroom.test.common.TestUtil; + +import org.junit.jupiter.api.DynamicTest; +import org.junit.jupiter.api.TestFactory; + +import java.util.stream.Stream; + +class TestProxyId { + + @TestFactory + Stream test() { + return TestUtil.buildDynamicTestStream() + .withInputAndOutputType(String.class) + .withSingleArgTestFunction(ProxyId::createSafeString) + .withSimpleEqualityAssertion() + .addCase("", "") + .addCase("foo_bar", "foo-bar") + .addCase("foo-bar", "foo-bar") + .addCase("foo-bar?()", "foo-bar---") + .build(); + } +} diff --git a/stroom-proxy/stroom-proxy-repo/src/main/java/stroom/proxy/repo/CSVFormatter.java b/stroom-proxy/stroom-proxy-repo/src/main/java/stroom/proxy/repo/CSVFormatter.java index 5e77afc6ebe..655def905ac 100644 --- a/stroom-proxy/stroom-proxy-repo/src/main/java/stroom/proxy/repo/CSVFormatter.java +++ b/stroom-proxy/stroom-proxy-repo/src/main/java/stroom/proxy/repo/CSVFormatter.java @@ -16,9 +16,11 @@ public class CSVFormatter { private static final String ESCAPED_EQUALS = "\\="; private static final Pattern EQUALS_PATTERN = Pattern.compile(EQUALS); - public static String format(final Map map) { + public static String format(final Map map, final boolean sortByKey) { final List keys = new ArrayList<>(map.keySet()); - Collections.sort(keys); + if (sortByKey) { + Collections.sort(keys); + } final StringBuilder sb = new StringBuilder(); for (final String key : keys) { @@ -32,7 +34,7 @@ public static String format(final Map map) { sb.append(COMMA); } - if (sb.length() > 0) { + if (!sb.isEmpty()) { sb.setLength(sb.length() - 1); } diff --git a/stroom-proxy/stroom-proxy-repo/src/main/java/stroom/proxy/repo/LogStream.java b/stroom-proxy/stroom-proxy-repo/src/main/java/stroom/proxy/repo/LogStream.java index c339b1b8e40..94e4011bafa 100644 --- a/stroom-proxy/stroom-proxy-repo/src/main/java/stroom/proxy/repo/LogStream.java +++ b/stroom-proxy/stroom-proxy-repo/src/main/java/stroom/proxy/repo/LogStream.java @@ -8,10 +8,10 @@ import jakarta.inject.Singleton; import org.slf4j.Logger; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.stream.Collectors; @Singleton public class LogStream { @@ -23,6 +23,19 @@ public LogStream(final Provider logStreamConfigProvider) { this.logStreamConfigProvider = logStreamConfigProvider; } + public Map filterAttributes(final AttributeMap attributeMap) { + // Use a LinkedHashMap to adhere to metaKeys order, which is a LinkedHashSet + final List metaKeys = logStreamConfigProvider.get().getMetaKeys(); + if (NullSafe.hasItems(metaKeys)) { + final Map map = new LinkedHashMap<>(metaKeys.size()); + metaKeys.forEach(key -> + map.put(key, attributeMap.get(key))); + return map; + } else { + return Collections.emptyMap(); + } + } + public void log(final Logger logger, final AttributeMap attributeMap, final String type, @@ -43,29 +56,23 @@ public void log(final Logger logger, final String message) { if (logger.isInfoEnabled()) { - final Set metaKeys = logStreamConfigProvider.get().getMetaKeys(); + final Map filteredAttributes = filterAttributes(attributeMap); - if (NullSafe.hasItems(metaKeys)) { - final Map filteredMap = attributeMap.entrySet() - .stream() - .filter(entry -> metaKeys.contains(entry.getKey().toLowerCase())) - .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); - final String kvPairs = CSVFormatter.format(filteredMap); - final String logLine = CSVFormatter.escape(type) + - "," + - CSVFormatter.escape(url) + - "," + - responseCode + - "," + - bytes + - "," + - duration + - "," + - CSVFormatter.escape(message) + - "," + - kvPairs; - logger.info(logLine); - } + final String kvPairs = CSVFormatter.format(filteredAttributes, false); + final String logLine = CSVFormatter.escape(type) + + "," + + CSVFormatter.escape(url) + + "," + + responseCode + + "," + + bytes + + "," + + duration + + "," + + CSVFormatter.escape(message) + + "," + + kvPairs; + logger.info(logLine); } } } diff --git a/stroom-proxy/stroom-proxy-repo/src/main/java/stroom/proxy/repo/LogStreamConfig.java b/stroom-proxy/stroom-proxy-repo/src/main/java/stroom/proxy/repo/LogStreamConfig.java index 80d04b2382d..23a930a3955 100644 --- a/stroom-proxy/stroom-proxy-repo/src/main/java/stroom/proxy/repo/LogStreamConfig.java +++ b/stroom-proxy/stroom-proxy-repo/src/main/java/stroom/proxy/repo/LogStreamConfig.java @@ -1,5 +1,7 @@ package stroom.proxy.repo; +import stroom.meta.api.StandardHeaderArguments; +import stroom.util.NullSafe; import stroom.util.shared.AbstractConfig; import stroom.util.shared.IsProxyConfig; @@ -7,43 +9,45 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyOrder; -import java.util.Collections; +import java.util.List; import java.util.Objects; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; +import java.util.stream.Collectors; @JsonPropertyOrder(alphabetic = true) public class LogStreamConfig extends AbstractConfig implements IsProxyConfig { - private final SortedSet metaKeys; + private final List metaKeys; public LogStreamConfig() { - // SortedSet so the order the keys appear in the log entries can be controlled - this(Collections.unmodifiableSortedSet(new TreeSet<>(Set.of( - "guid", - "feed", - "system", - "environment", - "remotehost", - "remoteaddress", - "remotedn", - "remotecertexpiry")))); + // Linked + this(List.of( + StandardHeaderArguments.GUID, + StandardHeaderArguments.RECEIPT_ID, + StandardHeaderArguments.FEED, + StandardHeaderArguments.SYSTEM, + StandardHeaderArguments.ENVIRONMENT, + StandardHeaderArguments.REMOTE_HOST, + StandardHeaderArguments.REMOTE_ADDRESS, + StandardHeaderArguments.REMOTE_DN, + StandardHeaderArguments.REMOTE_CERT_EXPIRY)); } @SuppressWarnings("unused") @JsonCreator - public LogStreamConfig(@JsonProperty("metaKeys") final SortedSet metaKeys) { - this.metaKeys = Collections.unmodifiableSortedSet(metaKeys); + public LogStreamConfig(@JsonProperty("metaKeys") final List metaKeys) { + this.metaKeys = NullSafe.stream(metaKeys) + .distinct() + .collect(Collectors.toList()); } /** * Optional log line with header attributes output as defined by this property - * @return + * The headers attributes that will be output in log lines. They will be output in the + * order that they appear in this list. Duplicates will be ignored. */ @JsonProperty - public SortedSet getMetaKeys() { + public List getMetaKeys() { return metaKeys; } @@ -67,7 +71,7 @@ public int hashCode() { @Override public String toString() { return "LogStreamConfig{" + - "metaKeys='" + metaKeys + '\'' + - '}'; + "metaKeys='" + metaKeys + '\'' + + '}'; } } diff --git a/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/ReceiptIdGenerator.java b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/ReceiptIdGenerator.java new file mode 100644 index 00000000000..e0cbee9fc62 --- /dev/null +++ b/stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/ReceiptIdGenerator.java @@ -0,0 +1,11 @@ +package stroom.receive.common; + +import stroom.util.concurrent.UniqueId; + +/** + * Generates a {@link UniqueId} for all received items. + */ +public interface ReceiptIdGenerator { + + UniqueId generateId(); +} diff --git a/stroom-util/src/main/java/stroom/util/concurrent/UniqueId.java b/stroom-util/src/main/java/stroom/util/concurrent/UniqueId.java new file mode 100644 index 00000000000..748e5ddb0b6 --- /dev/null +++ b/stroom-util/src/main/java/stroom/util/concurrent/UniqueId.java @@ -0,0 +1,197 @@ +package stroom.util.concurrent; + +import stroom.util.NullSafe; +import stroom.util.logging.LogUtil; + +import com.google.common.base.Strings; + +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public final class UniqueId { + + public static final String UNIQUE_ID_DELIMITER = "_"; + public static final Pattern UNIQUE_ID_DELIMITER_PATTERN = Pattern.compile("_"); + + private static final int EPOCH_MS_DIGITS = String.valueOf(Long.MAX_VALUE).length(); + private static final int SEQUENCE_NO_DIGITS = String.valueOf(UniqueIdGenerator.MAX_SEQUENCE_NO).length(); + + // Cache 0-9 inc. in padded form as a minor optimisation + private static final String[] CACHED_SEQUENCE_NOS = IntStream.range(0, 10) + .boxed() + .map(i -> Strings.padStart(String.valueOf(i), SEQUENCE_NO_DIGITS, '0')) + .toArray(String[]::new); + + public static final Pattern UNIQUE_ID_PATTERN = Pattern.compile( + "^" + + "\\d{" + EPOCH_MS_DIGITS + "}" + + UNIQUE_ID_DELIMITER + + "\\d{" + SEQUENCE_NO_DIGITS + "}" + + UNIQUE_ID_DELIMITER + + "[A-Z]" + + UNIQUE_ID_DELIMITER + + UniqueIdGenerator.NODE_ID_BASE_REGEX + + "$"); + + private final long epochMs; + private final int sequenceNo; + private final NodeType nodeType; + private final String nodeId; + + public UniqueId(final long epochMs, + final int sequenceNo, + final NodeType nodeType, + final String nodeId) { + + if (sequenceNo > UniqueIdGenerator.MAX_SEQUENCE_NO) { + throw new IllegalArgumentException( + "sequenceNo cannot be greater than " + UniqueIdGenerator.MAX_SEQUENCE_NO); + } + if (NullSafe.isBlankString(nodeId)) { + throw new IllegalArgumentException("nodeId cannot be blank"); + } + + this.epochMs = epochMs; + this.sequenceNo = sequenceNo; + this.nodeType = Objects.requireNonNull(nodeType); + this.nodeId = nodeId; + } + + /** + * Parse a {@link UniqueId} from a string. + */ + public static UniqueId parse(final String uniqueIdStr) { + final String trimmed = NullSafe.trim(uniqueIdStr); + if (NullSafe.isEmptyString(trimmed)) { + return null; + } else { + if (!uniqueIdStr.contains(UNIQUE_ID_DELIMITER)) { + throw new IllegalArgumentException(LogUtil.message( + "Invalid uniqueIdStr '{}', no '{}' found", + uniqueIdStr, UNIQUE_ID_DELIMITER)); + } + final String[] parts = UNIQUE_ID_DELIMITER_PATTERN.split(trimmed); + if (parts.length != 4) { + throw new IllegalArgumentException(LogUtil.message( + "Invalid uniqueIdStr '{}', expecting four parts when splitting on '{}'", + trimmed, UNIQUE_ID_DELIMITER)); + } + final long epochMs = Long.parseLong(parts[0]); + final int sequenceNo = Integer.parseInt(parts[1]); + final NodeType nodeType = NodeType.fromString(parts[2]); + final String nodeId = parts[3]; + return new UniqueId(epochMs, sequenceNo, nodeType, nodeId); + } + } + + @Override + public String toString() { +// return toString(epochMs, sequenceNo, nodeType, nodeId); + // Minor optimisation as 0 will be used a lot, so have a hard coded zero string + final String sequenceNoStr = sequenceNo < 10 + ? CACHED_SEQUENCE_NOS[sequenceNo] + : Strings.padStart(String.valueOf(sequenceNo), SEQUENCE_NO_DIGITS, '0'); + + return Strings.padStart(String.valueOf(epochMs), EPOCH_MS_DIGITS, '0') + + UNIQUE_ID_DELIMITER + + sequenceNoStr + + UNIQUE_ID_DELIMITER + + nodeType.toString() + + UNIQUE_ID_DELIMITER + + nodeId; + } + + /** + * @return The time in millis since epoch that the ID was generated. + */ + public long getEpochMs() { + return epochMs; + } + + /** + * @return A Sequence number that is used to differentiate {@link UniqueId}s that + * are generated during the same millisecond. + */ + public int getSequenceNo() { + return sequenceNo; + } + + public NodeType getNodeType() { + return nodeType; + } + + /** + * @return The name/identifier for the node instance, e.g. in a cluster, + * each node generating {@link UniqueId}s must have a unique node ID. + */ + public String getNodeId() { + return nodeId; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj == null || obj.getClass() != this.getClass()) { + return false; + } + var that = (UniqueId) obj; + return this.epochMs == that.epochMs && + this.sequenceNo == that.sequenceNo && + Objects.equals(this.nodeId, that.nodeId); + } + + @Override + public int hashCode() { + return Objects.hash(epochMs, sequenceNo, nodeId); + } + + + // -------------------------------------------------------------------------------- + + + /** + * Whether the node is a stroom-proxy or stroom node. + */ + public enum NodeType { + PROXY("P"), + STROOM("S"), + ; + + private final String displayValue; + private static final Map DISPLAY_VALUE_TO_ENUM_MAP = Arrays.stream(NodeType.values()) + .collect(Collectors.toMap( + nodeType -> nodeType.toString().toUpperCase(), + Function.identity())); + + NodeType(final String displayValue) { + this.displayValue = displayValue; + } + + public static NodeType fromString(final String str) { + if (NullSafe.isBlankString(str)) { + throw new IllegalArgumentException("Blank string not a valid UniqueIdType"); + } + NodeType nodeType = DISPLAY_VALUE_TO_ENUM_MAP.get(str); + if (nodeType == null) { + nodeType = DISPLAY_VALUE_TO_ENUM_MAP.get(str.toUpperCase()); + } + if (nodeType == null) { + throw new IllegalArgumentException(LogUtil.message( + "Unable to parse {} to a UniqueIdType. Valid values are {}", str, NodeType.values())); + } + return nodeType; + } + + @Override + public String toString() { + return displayValue; + } + } +} diff --git a/stroom-util/src/main/java/stroom/util/concurrent/UniqueIdGenerator.java b/stroom-util/src/main/java/stroom/util/concurrent/UniqueIdGenerator.java index 2c234870907..6d8cb90f356 100644 --- a/stroom-util/src/main/java/stroom/util/concurrent/UniqueIdGenerator.java +++ b/stroom-util/src/main/java/stroom/util/concurrent/UniqueIdGenerator.java @@ -1,12 +1,10 @@ package stroom.util.concurrent; -import stroom.util.NullSafe; +import stroom.util.concurrent.UniqueId.NodeType; import stroom.util.logging.LambdaLogger; import stroom.util.logging.LambdaLoggerFactory; import stroom.util.logging.LogUtil; -import com.google.common.base.Strings; - import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; @@ -22,21 +20,22 @@ public class UniqueIdGenerator { private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(UniqueIdGenerator.class); - private static final String NODE_ID_BASE_REGEX = "[A-Za-z0-9][A-Za-z0-9-]+"; + static final int MAX_SEQUENCE_NO = 9_999; + static final String NODE_ID_BASE_REGEX = "[A-Za-z0-9-]+"; private static final Pattern NODE_ID_PATTERN = Pattern.compile("^" + NODE_ID_BASE_REGEX + "$"); - private static final short MAX_SEQUENCE_NO = 9_999; - + private final NodeType nodeType; private final String nodeId; private final AtomicReference stateRef = new AtomicReference<>(State.ofNow()); - public UniqueIdGenerator(final String nodeId) { + public UniqueIdGenerator(final NodeType nodeType, final String nodeId) { Objects.requireNonNull(nodeId); if (!NODE_ID_PATTERN.matcher(nodeId).matches()) { throw new IllegalArgumentException(LogUtil.message( "nodeId '{}' must match the pattern {}", nodeId, NODE_ID_PATTERN)); } + this.nodeType = nodeType; this.nodeId = nodeId; } @@ -44,15 +43,12 @@ public UniqueIdGenerator(final String nodeId) { * Each call will generate a new globally unique ID. */ public UniqueId generateId() { - State state = State.ofNow(); - state = stateRef.accumulateAndGet(state, (currState, newState) -> { - final long currEpochMs = currState.epochMs; - final long newEpochMs = newState.epochMs; - return newEpochMs > currEpochMs + final State state = stateRef.accumulateAndGet(State.ofNow(), (currState, newState) -> { + return newState.epochMs > currState.epochMs ? newState : currState.withNextSequenceNo(); }); - return new UniqueId(state.epochMs, state.sequenceNo, nodeId); + return new UniqueId(state.epochMs, state.sequenceNo, nodeType, nodeId); } @@ -60,10 +56,10 @@ public UniqueId generateId() { private record State(long epochMs, - short sequenceNo) { + int sequenceNo) { - private static final short INITIAL_SEQUENCE_NO = (short) 0; - private static final long PARK_NANOS = 50 * 1_000_000; // 100millis + private static final int INITIAL_SEQUENCE_NO = 0; + private static final long PARK_NANOS = 100_000; // 0.1 millis public static State ofNow() { return new State(System.currentTimeMillis(), INITIAL_SEQUENCE_NO); @@ -74,16 +70,15 @@ public static State ofNewTimestamp(long epochMs) { } public State withNextSequenceNo() { - int newSequenceNo = sequenceNo + 1; - if (newSequenceNo <= MAX_SEQUENCE_NO) { - return new State(epochMs, (short) (sequenceNo + 1)); + if (sequenceNo < MAX_SEQUENCE_NO) { + return new State(epochMs, sequenceNo + 1); } else { // Overflowed sequenceNo so wait for the next epochMs to roll round. - // very unlikely for us to need more than 10k IDs in a millisecond on + // Very unlikely for us to need more than 10k IDs in a millisecond on // one node. long newEpochMs = System.currentTimeMillis(); - LOGGER.info("About to loop, epochMs {}, newEpochMs {}", epochMs, newEpochMs); + LOGGER.debug("About to loop, epochMs {}, newEpochMs {}", epochMs, newEpochMs); while (newEpochMs <= epochMs) { LockSupport.parkNanos(PARK_NANOS); newEpochMs = System.currentTimeMillis(); @@ -92,103 +87,4 @@ public State withNextSequenceNo() { } } } - - - // -------------------------------------------------------------------------------- - - - public record UniqueId( - long epochMs, - short sequenceNo, - String nodeId) { - - public static final String UNIQUE_ID_DELIMITER = "_"; - public static final Pattern UNIQUE_ID_DELIMITER_PATTERN = Pattern.compile("_"); - - private static final int EPOCH_MS_DIGITS = String.valueOf(Long.MAX_VALUE).length(); - private static final int SEQUENCE_NO_DIGITS = String.valueOf(MAX_SEQUENCE_NO).length(); - private static final String ZERO_SEQUENCE_NO = Strings.padStart( - "0", SEQUENCE_NO_DIGITS, '0'); - - public static final Pattern UNIQUE_ID_PATTERN = Pattern.compile( - "^" - + "\\d{" + EPOCH_MS_DIGITS + "}" - + UNIQUE_ID_DELIMITER - + "\\d{" + SEQUENCE_NO_DIGITS + "}" - + UNIQUE_ID_DELIMITER - + NODE_ID_BASE_REGEX - + "$"); - - /** - * Parse a {@link UniqueId} from a string. - */ - public static UniqueId parse(final String uniqueIdStr) { - final String trimmed = NullSafe.trim(uniqueIdStr); - if (NullSafe.isEmptyString(trimmed)) { - return null; - } else { - if (!uniqueIdStr.contains(UNIQUE_ID_DELIMITER)) { - throw new IllegalArgumentException(LogUtil.message( - "Invalid uniqueIdStr '{}', no '{}' found", - uniqueIdStr, UNIQUE_ID_DELIMITER)); - } - final String[] parts = UNIQUE_ID_DELIMITER_PATTERN.split(trimmed); - if (parts.length != 3) { - throw new IllegalArgumentException(LogUtil.message( - "Invalid uniqueIdStr '{}', expecting three parts when splitting on '{}'", - trimmed, UNIQUE_ID_DELIMITER)); - } - final long epochMs = Long.parseLong(parts[0]); - final short sequenceNo = Short.parseShort(parts[1]); - final String nodeId = parts[2]; - return new UniqueId(epochMs, sequenceNo, nodeId); - } - } - - @Override - public String toString() { - return toString(epochMs, sequenceNo, nodeId); - } - - public String toString(long epochMs, - short sequenceNo, - String nodeId) { - // Minor optimisation as 0 will be used a lot, so have a hard coded zero string - final String sequenceNoStr = sequenceNo == 0 - ? ZERO_SEQUENCE_NO - : Strings.padStart(String.valueOf(sequenceNo), SEQUENCE_NO_DIGITS, '0'); - - return Strings.padStart(String.valueOf(epochMs), EPOCH_MS_DIGITS, '0') - + UNIQUE_ID_DELIMITER - + sequenceNoStr - + UNIQUE_ID_DELIMITER - + nodeId; - } - - /** - * @return The time in millis since epoch that the ID was generated. - */ - @Override - public long epochMs() { - return epochMs; - } - - /** - * @return A Sequence number that is used to differentiate {@link UniqueId}s that - * are generated during the same millisecond. - */ - @Override - public short sequenceNo() { - return sequenceNo; - } - - /** - * @return The name/identifier for the node instance, e.g. in a cluster, - * each node generating {@link UniqueId}s must have a unique node ID. - */ - @Override - public String nodeId() { - return nodeId; - } - } } diff --git a/stroom-util/src/test/java/stroom/util/concurrent/TestUniqueId.java b/stroom-util/src/test/java/stroom/util/concurrent/TestUniqueId.java new file mode 100644 index 00000000000..18b01199bf2 --- /dev/null +++ b/stroom-util/src/test/java/stroom/util/concurrent/TestUniqueId.java @@ -0,0 +1,46 @@ +package stroom.util.concurrent; + +import stroom.util.concurrent.UniqueId.NodeType; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +class TestUniqueId { + + @Test + void parse() { + final UniqueId uniqueId = new UniqueId(123456, 22, NodeType.PROXY, "node-abc"); + final String str = uniqueId.toString(); + final UniqueId uniqueId2 = UniqueId.parse(str); + Assertions.assertThat(uniqueId2) + .isEqualTo(uniqueId); + } + + @Test + void testToString() { + final UniqueId uniqueId = new UniqueId(123456, 22, NodeType.STROOM, "node-abc"); + final String str = uniqueId.toString(); + Assertions.assertThat(str) + .isEqualTo("0000000000000123456_0022_S_node-abc"); + } + + @Test + void testToString2() { + final UniqueId uniqueId = new UniqueId(123456, 5, NodeType.PROXY, "node-abc"); + final String str = uniqueId.toString(); + Assertions.assertThat(str) + .isEqualTo("0000000000000123456_0005_P_node-abc"); + } + + @Test + void testNodeType() { + Assertions.assertThat(NodeType.fromString("P")) + .isEqualTo(NodeType.PROXY); + Assertions.assertThat(NodeType.fromString("p")) + .isEqualTo(NodeType.PROXY); + Assertions.assertThat(NodeType.fromString("S")) + .isEqualTo(NodeType.STROOM); + Assertions.assertThat(NodeType.fromString("s")) + .isEqualTo(NodeType.STROOM); + } +} diff --git a/stroom-util/src/test/java/stroom/util/concurrent/TestUniqueIdGenerator.java b/stroom-util/src/test/java/stroom/util/concurrent/TestUniqueIdGenerator.java index c1f811e3e36..d9aff44e3eb 100644 --- a/stroom-util/src/test/java/stroom/util/concurrent/TestUniqueIdGenerator.java +++ b/stroom-util/src/test/java/stroom/util/concurrent/TestUniqueIdGenerator.java @@ -1,6 +1,6 @@ package stroom.util.concurrent; -import stroom.util.concurrent.UniqueIdGenerator.UniqueId; +import stroom.util.concurrent.UniqueId.NodeType; import stroom.util.logging.DurationTimer; import stroom.util.logging.LambdaLogger; import stroom.util.logging.LambdaLoggerFactory; @@ -29,30 +29,30 @@ class TestUniqueIdGenerator { void testBadNodeId() { Assertions.assertThatThrownBy( () -> { - new UniqueIdGenerator("foo bar"); + new UniqueIdGenerator(NodeType.PROXY, "foo bar"); }).isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("must match the pattern"); } @Test void simple() { - UniqueIdGenerator generator = new UniqueIdGenerator("node1"); + UniqueIdGenerator generator = new UniqueIdGenerator(NodeType.PROXY, "node1"); ThreadUtil.sleep(20); final UniqueId uniqueId = generator.generateId(); final String str = uniqueId.toString(); LOGGER.info("uniqueId: {}", uniqueId); assertThat(str) - .contains(String.valueOf(uniqueId.epochMs())); + .contains(String.valueOf(uniqueId.getEpochMs())); assertThat(str) - .contains(String.valueOf(uniqueId.sequenceNo())); + .contains(String.valueOf(uniqueId.getSequenceNo())); assertThat(str) - .contains(uniqueId.nodeId()); + .contains(uniqueId.getNodeId()); } @Test void parse() { - UniqueIdGenerator generator = new UniqueIdGenerator("node1"); + UniqueIdGenerator generator = new UniqueIdGenerator(NodeType.PROXY, "node1"); final UniqueId uniqueId1 = generator.generateId(); final String str = uniqueId1.toString(); @@ -70,7 +70,7 @@ void multiThreadUniqueness() { final CountDownLatch startLatch = new CountDownLatch(cores); final CountDownLatch completionLatch = new CountDownLatch(cores); - final UniqueIdGenerator generator = new UniqueIdGenerator("node1"); + final UniqueIdGenerator generator = new UniqueIdGenerator(NodeType.PROXY, "node1"); int iterations = 100_000; final List[] lists = new List[cores]; diff --git a/unreleased_changes/20250114_171914_349__0.md b/unreleased_changes/20250114_171914_349__0.md new file mode 100644 index 00000000000..d72219a8a55 --- /dev/null +++ b/unreleased_changes/20250114_171914_349__0.md @@ -0,0 +1,19 @@ +* Change `receiptId` format to `__<(P|S)>_`. `P|S` represents stroom or Proxy. + + +```sh +# ONLY the top line will be included as a change entry in the CHANGELOG. +# The entry should be in GitHub flavour markdown and should be written on a SINGLE +# line with no hard breaks. You can have multiple change files for a single GitHub issue. +# The entry should be written in the imperative mood, i.e. 'Fix nasty bug' rather than +# 'Fixed nasty bug'. +# +# Examples of acceptable entries are: +# +# +# * Issue **123** : Fix bug with an associated GitHub issue in this repository +# +# * Issue **namespace/other-repo#456** : Fix bug with an associated GitHub issue in another repository +# +# * Fix bug with no associated GitHub issue. +``` diff --git a/unreleased_changes/20250114_172314_625__0.md b/unreleased_changes/20250114_172314_625__0.md new file mode 100644 index 00000000000..9f8b23050ca --- /dev/null +++ b/unreleased_changes/20250114_172314_625__0.md @@ -0,0 +1,19 @@ +* Change stroom to also set the `receiptId` meta attribute on receipt or upload. + + +```sh +# ONLY the top line will be included as a change entry in the CHANGELOG. +# The entry should be in GitHub flavour markdown and should be written on a SINGLE +# line with no hard breaks. You can have multiple change files for a single GitHub issue. +# The entry should be written in the imperative mood, i.e. 'Fix nasty bug' rather than +# 'Fixed nasty bug'. +# +# Examples of acceptable entries are: +# +# +# * Issue **123** : Fix bug with an associated GitHub issue in this repository +# +# * Issue **namespace/other-repo#456** : Fix bug with an associated GitHub issue in another repository +# +# * Fix bug with no associated GitHub issue. +``` diff --git a/unreleased_changes/20250114_172612_413__0.md b/unreleased_changes/20250114_172612_413__0.md new file mode 100644 index 00000000000..d9368c71f87 --- /dev/null +++ b/unreleased_changes/20250114_172612_413__0.md @@ -0,0 +1,19 @@ +* Change proxy logging to still log datafeed events even if the `metaKeys` config prop is empty. + + +```sh +# ONLY the top line will be included as a change entry in the CHANGELOG. +# The entry should be in GitHub flavour markdown and should be written on a SINGLE +# line with no hard breaks. You can have multiple change files for a single GitHub issue. +# The entry should be written in the imperative mood, i.e. 'Fix nasty bug' rather than +# 'Fixed nasty bug'. +# +# Examples of acceptable entries are: +# +# +# * Issue **123** : Fix bug with an associated GitHub issue in this repository +# +# * Issue **namespace/other-repo#456** : Fix bug with an associated GitHub issue in another repository +# +# * Fix bug with no associated GitHub issue. +``` diff --git a/unreleased_changes/20250114_175325_302__4695.md b/unreleased_changes/20250114_175325_302__4695.md new file mode 100644 index 00000000000..9245b455d3c --- /dev/null +++ b/unreleased_changes/20250114_175325_302__4695.md @@ -0,0 +1,24 @@ +* Issue **#4695** : Change proxy to re-create the proxy ID in proxy-id.txt if the value in there does not match the required pattern. + + +```sh +# ******************************************************************************** +# Issue title: proxyId regex needs changing for AWS nodes +# Issue link: https://github.com/gchq/stroom/issues/4695 +# ******************************************************************************** + +# ONLY the top line will be included as a change entry in the CHANGELOG. +# The entry should be in GitHub flavour markdown and should be written on a SINGLE +# line with no hard breaks. You can have multiple change files for a single GitHub issue. +# The entry should be written in the imperative mood, i.e. 'Fix nasty bug' rather than +# 'Fixed nasty bug'. +# +# Examples of acceptable entries are: +# +# +# * Issue **123** : Fix bug with an associated GitHub issue in this repository +# +# * Issue **namespace/other-repo#456** : Fix bug with an associated GitHub issue in another repository +# +# * Fix bug with no associated GitHub issue. +``` diff --git a/unreleased_changes/20250114_175416_442__0.md b/unreleased_changes/20250114_175416_442__0.md new file mode 100644 index 00000000000..1c9fae72ebd --- /dev/null +++ b/unreleased_changes/20250114_175416_442__0.md @@ -0,0 +1,19 @@ +* Fix the sleep time in UniqueIdGenerator (from 50ms to 0.1ms). + + +```sh +# ONLY the top line will be included as a change entry in the CHANGELOG. +# The entry should be in GitHub flavour markdown and should be written on a SINGLE +# line with no hard breaks. You can have multiple change files for a single GitHub issue. +# The entry should be written in the imperative mood, i.e. 'Fix nasty bug' rather than +# 'Fixed nasty bug'. +# +# Examples of acceptable entries are: +# +# +# * Issue **123** : Fix bug with an associated GitHub issue in this repository +# +# * Issue **namespace/other-repo#456** : Fix bug with an associated GitHub issue in another repository +# +# * Fix bug with no associated GitHub issue. +```