Skip to content

Commit

Permalink
Bulk Load CDK: final micronaut properties cleanup (#52093)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Jan 24, 2025
1 parent 57d8302 commit 293e8f3
Show file tree
Hide file tree
Showing 13 changed files with 46 additions and 35 deletions.
12 changes: 0 additions & 12 deletions airbyte-cdk/bulk/core/base/src/main/resources/application.yaml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ data class Property(val micronautProperty: String, val environmentVariable: Stri
object EnvVarConstants {
val FILE_TRANSFER_ENABLED =
Property(
"airbyte.file-transfer.enabled",
"airbyte.destination.core.file-transfer.enabled",
"USE_FILE_TRANSFER",
)
val RECORD_BATCH_SIZE =
Property(
"airbyte.destination.record-batch-size-override",
"airbyte.destination.core.record-batch-size-override",
"AIRBYTE_DESTINATION_RECORD_BATCH_SIZE_OVERRIDE",
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class SyncBeanFactory {
@Singleton
@Named("diskManager")
fun diskManager(
@Value("\${airbyte.resources.disk.bytes}") availableBytes: Long,
@Value("\${airbyte.destination.core.resources.disk.bytes}") availableBytes: Long,
): ReservationManager {
return ReservationManager(availableBytes)
}
Expand All @@ -54,7 +54,7 @@ class SyncBeanFactory {
@Singleton
@Named("fileAggregateQueue")
fun fileAggregateQueue(
@Value("\${airbyte.resources.disk.bytes}") availableBytes: Long,
@Value("\${airbyte.destination.core.resources.disk.bytes}") availableBytes: Long,
config: DestinationConfiguration,
catalog: DestinationCatalog
): MultiProducerChannel<FileAggregateMessage> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,8 @@ data object Undefined : DestinationMessage {
@Singleton
class DestinationMessageFactory(
private val catalog: DestinationCatalog,
@Value("\${airbyte.file-transfer.enabled}") private val fileTransferEnabled: Boolean,
@Value("\${airbyte.destination.core.file-transfer.enabled}")
private val fileTransferEnabled: Boolean,
) {
fun fromAirbyteMessage(
message: AirbyteMessage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ class DefaultDestinationTaskLauncher(
private val failSyncTaskFactory: FailSyncTaskFactory,

// File transfer
@Value("\${airbyte.file-transfer.enabled}") private val fileTransferEnabled: Boolean,
@Value("\${airbyte.destination.core.file-transfer.enabled}")
private val fileTransferEnabled: Boolean,

// Input Consumer requirements
private val inputFlow: ReservingDeserializingInputFlow,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import kotlinx.coroutines.channels.ClosedSendChannelException
@Singleton
@Secondary
class FlushTickTask(
@Value("\${airbyte.flush.rate-ms}") private val tickIntervalMs: Long,
@Value("\${airbyte.destination.core.flush.rate-ms}") private val tickIntervalMs: Long,
private val clock: Clock,
private val coroutineTimeUtils: TimeProvider,
private val catalog: DestinationCatalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ class DefaultSpillToDiskTaskFactory(

@Singleton
class FileAccumulatorFactory(
@Value("\${airbyte.flush.window-ms}") private val windowWidthMs: Long,
@Value("\${airbyte.destination.core.flush.window-ms}") private val windowWidthMs: Long,
private val spillFileProvider: SpillFileProvider,
private val clock: Clock,
) {
Expand Down
13 changes: 13 additions & 0 deletions airbyte-cdk/bulk/core/load/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
airbyte:
destination:
core:
record-batch-size-override: ${AIRBYTE_DESTINATION_RECORD_BATCH_SIZE_OVERRIDE:null}
file-transfer:
enabled: ${USE_FILE_TRANSFER:false}
staging-path: ${AIRBYTE_STAGING_DIRECTORY:/staging/files}
resources:
disk:
bytes: ${CONNECTOR_STORAGE_LIMIT_BYTES:5368709120} # 5GB
flush:
rate-ms: 900000 # 15 minutes
window-ms: 900000 # 15 minutes
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.initialization
package io.airbyte.cdk.load.initialization

import io.micronaut.context.annotation.Bean
import io.micronaut.context.annotation.Factory
Expand Down Expand Up @@ -32,8 +32,8 @@ data class DefaultValueBean(
class TestFactory {
@Bean
fun defaultValueBean(
@Value("\${airbyte.file-transfer.staging-path}") stagingFolder: String,
@Value("\${airbyte.file-transfer.enabled}") fileTransferEnable: Boolean,
@Value("\${airbyte.destination.core.file-transfer.staging-path}") stagingFolder: String,
@Value("\${airbyte.destination.core.file-transfer.enabled}") fileTransferEnable: Boolean,
): DefaultValueBean {
return DefaultValueBean(stagingFolder, fileTransferEnable)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.command.aws

import io.airbyte.cdk.load.command.Property

object AwsEnvVarConstants {
val ASSUME_ROLE_ACCESS_KEY = Property(ACCESS_KEY_PROPERTY, "AWS_ACCESS_KEY_ID")
val ASSUME_ROLE_SECRET_KEY = Property(SECRET_KEY_PROPERTY, "AWS_SECRET_ACCESS_KEY")
val ASSUME_ROLE_EXTERNAL_ID = Property(EXTERNAL_ID_PROPERTY, "AWS_ASSUME_ROLE_EXTERNAL_ID")
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import io.airbyte.cdk.load.command.Property

fun AwsAssumeRoleCredentials.asMicronautProperties(): Map<Property, String> =
mapOf(
Property(ACCESS_KEY_PROPERTY, "AWS_ACCESS_KEY_ID") to accessKey,
Property(SECRET_KEY_PROPERTY, "AWS_SECRET_ACCESS_KEY") to secretKey,
Property(EXTERNAL_ID_PROPERTY, "AWS_ASSUME_ROLE_EXTERNAL_ID") to externalId,
AwsEnvVarConstants.ASSUME_ROLE_ACCESS_KEY to accessKey,
AwsEnvVarConstants.ASSUME_ROLE_SECRET_KEY to secretKey,
AwsEnvVarConstants.ASSUME_ROLE_EXTERNAL_ID to externalId,
)

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.command.ValidatedJsonUtils
import io.airbyte.cdk.load.command.aws.AwsAssumeRoleCredentials
import io.airbyte.cdk.load.command.aws.AwsEnvVarConstants
import io.airbyte.cdk.load.util.Jsons
import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil
import java.nio.file.Files
Expand All @@ -26,13 +27,10 @@ object S3DataLakeTestUtil {

fun getAwsAssumeRoleCredentials(): AwsAssumeRoleCredentials {
val creds = getAwsAssumeRoleCredentialsAsMap()
val assumeRoleAccessKey = creds["AWS_ACCESS_KEY_ID"]!!
val assumeRoleSecretKey = creds["AWS_SECRET_ACCESS_KEY"]!!
val assumeRoleExternalId = creds["AWS_ASSUME_ROLE_EXTERNAL_ID"]!!
return AwsAssumeRoleCredentials(
assumeRoleAccessKey,
assumeRoleSecretKey,
assumeRoleExternalId,
creds[AwsEnvVarConstants.ASSUME_ROLE_ACCESS_KEY.environmentVariable]!!,
creds[AwsEnvVarConstants.ASSUME_ROLE_SECRET_KEY.environmentVariable]!!,
creds[AwsEnvVarConstants.ASSUME_ROLE_EXTERNAL_ID.environmentVariable]!!,
)
}

Expand Down

0 comments on commit 293e8f3

Please sign in to comment.