Skip to content

Commit

Permalink
Log analytics messages (#9307)
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe Reuter committed Oct 18, 2023
1 parent ab6c66b commit 27a23a3
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public ReplicationWorker create(final ReplicationInput replicationInput,

log.info("Setting up replication worker...");
final SyncPersistence syncPersistence = createSyncPersistence(syncPersistenceFactory, replicationInput, sourceLauncherConfig);
final AirbyteMessageTracker messageTracker = createMessageTracker(syncPersistence, featureFlags);
final AirbyteMessageTracker messageTracker = createMessageTracker(syncPersistence, featureFlags, replicationInput);

return createReplicationWorker(airbyteSource, airbyteDestination, messageTracker,
syncPersistence, recordSchemaValidator, fieldSelector, heartbeatTimeoutChaperone,
Expand Down Expand Up @@ -223,8 +223,10 @@ private static HeartbeatTimeoutChaperone createHeartbeatTimeoutChaperone(final H
* Create MessageTracker.
*/
private static AirbyteMessageTracker createMessageTracker(final SyncPersistence syncPersistence,
final FeatureFlags featureFlags) {
return new AirbyteMessageTracker(syncPersistence, featureFlags);
final FeatureFlags featureFlags,
final ReplicationInput replicationInput) {
return new AirbyteMessageTracker(syncPersistence, featureFlags, replicationInput.getSourceLauncherConfig().getDockerImage(),
replicationInput.getDestinationLauncherConfig().getDockerImage());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.AirbyteAnalyticsTraceMessage;
import io.airbyte.protocol.models.AirbyteControlConnectorConfigMessage;
import io.airbyte.protocol.models.AirbyteControlMessage;
import io.airbyte.protocol.models.AirbyteErrorTraceMessage;
Expand Down Expand Up @@ -243,4 +244,19 @@ public static AirbyteMessage createStatusTraceMessage(final StreamDescriptor str
.withTrace(airbyteTraceMessage);
}

public static AirbyteMessage createAnalyticsTraceMessage(final String type, final String value) {
final AirbyteAnalyticsTraceMessage airbyteAnalyticsTraceMessage = new AirbyteAnalyticsTraceMessage()
.withType(type)
.withValue(value);

final AirbyteTraceMessage airbyteTraceMessage = new AirbyteTraceMessage()
.withEmittedAt(Long.valueOf(System.currentTimeMillis()).doubleValue())
.withType(AirbyteTraceMessage.Type.ANALYTICS)
.withAnalytics(airbyteAnalyticsTraceMessage);

return new AirbyteMessage()
.withType(Type.TRACE)
.withTrace(airbyteTraceMessage);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.airbyte.workers.internal.bookkeeping
import io.airbyte.commons.features.FeatureFlags
import io.airbyte.commons.json.Jsons
import io.airbyte.config.FailureReason
import io.airbyte.protocol.models.AirbyteAnalyticsTraceMessage
import io.airbyte.protocol.models.AirbyteMessage
import io.airbyte.protocol.models.AirbyteTraceMessage
import io.airbyte.workers.helper.FailureHelper
Expand All @@ -21,6 +22,8 @@ private val logger = KotlinLogging.logger {}
class AirbyteMessageTracker(
val syncStatsTracker: SyncStatsTracker,
featureFlags: FeatureFlags,
private val sourceDockerImage: String,
private val destinationDockerImage: String,
) {
private val dstErrorTraceMsgs = ArrayList<AirbyteTraceMessage>()
private val srcErrorTraceMsgs = ArrayList<AirbyteTraceMessage>()
Expand Down Expand Up @@ -85,6 +88,7 @@ class AirbyteMessageTracker(
private fun handleEmittedTrace(msg: AirbyteTraceMessage, origin: AirbyteMessageOrigin): Unit = when (msg.type) {
AirbyteTraceMessage.Type.ESTIMATE -> syncStatsTracker.updateEstimates(msg.estimate)
AirbyteTraceMessage.Type.ERROR -> handleEmittedTraceError(msg, origin)
AirbyteTraceMessage.Type.ANALYTICS -> handleEmittedAnalyticsMessage(msg.analytics, origin)
AirbyteTraceMessage.Type.STREAM_STATUS -> logger.debug { "Stream status trace message not handled by message tracker: $msg" }
else -> logger.warn { "Invalid message type for trace message: $msg" }
}
Expand All @@ -97,6 +101,15 @@ class AirbyteMessageTracker(
}
}

/**
* Log analytics message - logs can be searched for certain events to analyze.
* This will be replaced by logic to collect the messages and attach them to the attempt summary in a subsequent PR.
*/
private fun handleEmittedAnalyticsMessage(msg: AirbyteAnalyticsTraceMessage, origin: AirbyteMessageOrigin) {
val dockerImage = if (origin == AirbyteMessageOrigin.SOURCE) sourceDockerImage else destinationDockerImage
logger.info { "$origin analytics [$dockerImage] | Type: ${msg.type} | Value: ${msg.value}" }
}

private fun logMsgAsJson(caller: String, msg: AirbyteMessage): Unit = when (logConnectorMsgs) {
true -> logger.info { "$caller message | ${Jsons.serialize(msg)}" }
else -> Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class AirbyteMessageTrackerTest {

@BeforeEach
void setup() {
this.messageTracker = new AirbyteMessageTracker(syncStatsTracker, new EnvVariableFeatureFlags());
this.messageTracker =
new AirbyteMessageTracker(syncStatsTracker, new EnvVariableFeatureFlags(), "airbyte/source-image", "airbyte/destination-image");
}

@Test
Expand Down Expand Up @@ -108,6 +109,24 @@ void testAcceptFromDestinationTraceEstimate() {
verify(syncStatsTracker).updateEstimates(trace.getTrace().getEstimate());
}

@Test
void testAcceptFromDestinationTraceAnalytics() {
final AirbyteMessage trace = AirbyteMessageUtils.createAnalyticsTraceMessage("abc", "def");

messageTracker.acceptFromDestination(trace);

verifyNoInteractions(syncStatsTracker);
}

@Test
void testAcceptFromSourceTraceAnalytics() {
final AirbyteMessage trace = AirbyteMessageUtils.createAnalyticsTraceMessage("abc", "def");

messageTracker.acceptFromSource(trace);

verifyNoInteractions(syncStatsTracker);
}

@Test
void testAcceptFromDestinationTraceError() {
final AirbyteMessage trace = AirbyteMessageUtils.createStatusTraceMessage(mock(StreamDescriptor.class), Type.ERROR);
Expand Down
2 changes: 1 addition & 1 deletion deps.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[versions]
airbyte-protocol = "0.4.1"
airbyte-protocol = "0.4.2"
commons_io = "2.7"
connectors-testcontainers = "1.15.3"
datadog-version = "1.14.0"
Expand Down

0 comments on commit 27a23a3

Please sign in to comment.