Skip to content

Commit

Permalink
Refactor how we configure and build cloud storage clients in CloudLogs (
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Dec 8, 2021
1 parent cbe01e9 commit bdda7e1
Show file tree
Hide file tree
Showing 27 changed files with 582 additions and 367 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.helpers.LogConfiguration;
import io.airbyte.config.storage.CloudStorageConfigs;
import io.airbyte.config.storage.CloudStorageConfigs.GcsConfig;
import io.airbyte.config.storage.CloudStorageConfigs.MinioConfig;
import io.airbyte.config.storage.CloudStorageConfigs.S3Config;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashSet;
Expand Down Expand Up @@ -101,22 +104,37 @@ public class EnvConfigs implements Configs {
public static final String DEFAULT_NETWORK = "host";

private final Function<String, String> getEnv;
private final LogConfiguration logConfiguration;
private final LogConfigs logConfigs;

public EnvConfigs() {
this(System::getenv);
}

EnvConfigs(final Function<String, String> getEnv) {
this.getEnv = getEnv;
this.logConfiguration = new LogConfiguration(
getEnvOrDefault(LogClientSingleton.S3_LOG_BUCKET, ""),
getEnvOrDefault(LogClientSingleton.S3_LOG_BUCKET_REGION, ""),
getEnvOrDefault(LogClientSingleton.AWS_ACCESS_KEY_ID, ""),
getEnvOrDefault(LogClientSingleton.AWS_SECRET_ACCESS_KEY, ""),
getEnvOrDefault(LogClientSingleton.S3_MINIO_ENDPOINT, ""),
getEnvOrDefault(LogClientSingleton.GCS_LOG_BUCKET, ""),
getEnvOrDefault(LogClientSingleton.GOOGLE_APPLICATION_CREDENTIALS, ""));
this.logConfigs = new LogConfigs(getLogConfiguration().orElse(null));
}

private Optional<CloudStorageConfigs> getLogConfiguration() {
if (getEnv(LogClientSingleton.GCS_LOG_BUCKET) != null) {
return Optional.of(CloudStorageConfigs.gcs(new GcsConfig(
getEnvOrDefault(LogClientSingleton.GCS_LOG_BUCKET, ""),
getEnvOrDefault(LogClientSingleton.GOOGLE_APPLICATION_CREDENTIALS, ""))));
} else if (getEnv(LogClientSingleton.S3_MINIO_ENDPOINT) != null) {
return Optional.of(CloudStorageConfigs.minio(new MinioConfig(
getEnvOrDefault(LogClientSingleton.S3_LOG_BUCKET, ""),
getEnvOrDefault(LogClientSingleton.AWS_ACCESS_KEY_ID, ""),
getEnvOrDefault(LogClientSingleton.AWS_SECRET_ACCESS_KEY, ""),
getEnvOrDefault(LogClientSingleton.S3_MINIO_ENDPOINT, ""))));
} else if (getEnv(LogClientSingleton.S3_LOG_BUCKET_REGION) != null) {
return Optional.of(CloudStorageConfigs.s3(new S3Config(
getEnvOrDefault(LogClientSingleton.S3_LOG_BUCKET, ""),
getEnvOrDefault(LogClientSingleton.AWS_ACCESS_KEY_ID, ""),
getEnvOrDefault(LogClientSingleton.AWS_SECRET_ACCESS_KEY, ""),
getEnvOrDefault(LogClientSingleton.S3_LOG_BUCKET_REGION, ""))));
} else {
return Optional.empty();
}
}

// CORE
Expand Down Expand Up @@ -402,9 +420,8 @@ public String getJobPodMainContainerMemoryLimit() {
return getEnvOrDefault(JOB_POD_MAIN_CONTAINER_MEMORY_LIMIT, DEFAULT_JOB_POD_MEMORY_REQUIREMENT);
}

// Logging/Monitoring/Tracking
public LogConfigs getLogConfigs() {
return logConfiguration;
return logConfigs;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.config.helpers;

import io.airbyte.config.storage.DefaultGcsClientFactory;
import io.airbyte.config.storage.DefaultS3ClientFactory;
import io.airbyte.config.storage.MinioS3ClientFactory;
import java.io.File;
import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -35,62 +38,20 @@ public interface CloudLogs {

void deleteLogs(LogConfigs configs, String logPath);

/**
* @return true if no cloud logging configuration is set;
*/
static boolean hasEmptyConfigs(final LogConfigs configs) {
return !hasMinioConfiguration(configs) && !hasS3Configuration(configs) && !hasGcpConfiguration(configs);
}

static CloudLogs createCloudLogClient(final LogConfigs configs) {
// check if the configs exists, and pick a client.
if (hasMinioConfiguration(configs)) {
LOGGER.info("Creating Minio Log Client");
return new S3Logs();
}

if (hasS3Configuration(configs)) {
LOGGER.info("Creating AWS Log Client");
return new S3Logs();
}

if (hasGcpConfiguration(configs)) {
LOGGER.info("Creating GCS Log Client");
return new GcsLogs();
switch (configs.getStorageConfigs().getType()) {
case S3 -> {
return new S3Logs(new DefaultS3ClientFactory(configs.getStorageConfigs().getS3Config()));
}
case MINIO -> {
return new S3Logs(new MinioS3ClientFactory(configs.getStorageConfigs().getMinioConfig()));
}
case GCS -> {
return new GcsLogs(new DefaultGcsClientFactory(configs.getStorageConfigs().getGcsConfig()));
}
}

throw new RuntimeException("Error no cloud credentials configured..");
}

/**
* Logs are configured to go to a Minio instance if the S3MinioEndpoint configuration specifically
* is set, along with the other default S3 logging configurations.
*
* @param configs contains the environment variables to check
* @return boolean indicating if logs are configured to be sent to minio
*/
private static boolean hasMinioConfiguration(final LogConfigs configs) {
return !configs.getS3LogBucket().isBlank() && !configs.getAwsAccessKey().isBlank()
&& !configs.getAwsSecretAccessKey().isBlank() && !configs.getS3MinioEndpoint().isBlank();
}

/**
* Logs are configured to go to S3 if the S3LogBucketRegion configuration specifically is set, along
* with the other default S3 logging configurations.
*
* @param configs contains the configuration values to check
* @return boolean indicating if logs are configured to be sent to S3
*/
private static boolean hasS3Configuration(final LogConfigs configs) {
return !configs.getAwsAccessKey().isBlank() &&
!configs.getAwsSecretAccessKey().isBlank() &&
!configs.getS3LogBucketRegion().isBlank() &&
!configs.getS3LogBucket().isBlank();
}

private static boolean hasGcpConfiguration(final LogConfigs configs) {
return !configs.getGcsLogBucket().isBlank() &&
!configs.getGoogleApplicationCredentials().isBlank();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,20 @@

package io.airbyte.config.helpers;

import com.google.api.client.util.Preconditions;
import com.google.api.gax.paging.Page;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Blob.BlobSourceOption;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import io.airbyte.commons.string.Strings;
import io.airbyte.config.EnvConfigs;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -27,19 +26,28 @@ public class GcsLogs implements CloudLogs {
private static final Logger LOGGER = LoggerFactory.getLogger(GcsLogs.class);

private static Storage GCS;
private final Supplier<Storage> gcsClientFactory;

public GcsLogs(final Supplier<Storage> gcsClientFactory) {
this.gcsClientFactory = gcsClientFactory;
}

@Override
public File downloadCloudLog(final LogConfigs configs, final String logPath) throws IOException {
return getFile(configs, logPath, LogClientSingleton.DEFAULT_PAGE_SIZE);
}

static File getFile(final LogConfigs configs, final String logPath, final int pageSize) throws IOException {
private File getFile(final LogConfigs configs, final String logPath, final int pageSize) throws IOException {
return getFile(getOrCreateGcsClient(), configs, logPath, pageSize);
}

@VisibleForTesting
static File getFile(final Storage gcsClient, final LogConfigs configs, final String logPath, final int pageSize) throws IOException {
LOGGER.debug("Retrieving logs from GCS path: {}", logPath);
createGcsClientIfNotExists(configs);

LOGGER.debug("Start GCS list request.");
final Page<Blob> blobs = GCS.list(
configs.getGcsLogBucket(),
final Page<Blob> blobs = gcsClient.list(
configs.getStorageConfigs().getGcsConfig().getBucketName(),
Storage.BlobListOption.prefix(logPath),
Storage.BlobListOption.pageSize(pageSize));

Expand All @@ -59,11 +67,12 @@ static File getFile(final LogConfigs configs, final String logPath, final int pa
@Override
public List<String> tailCloudLog(final LogConfigs configs, final String logPath, final int numLines) throws IOException {
LOGGER.debug("Tailing logs from GCS path: {}", logPath);
createGcsClientIfNotExists(configs);
final Storage gcsClient = getOrCreateGcsClient();

LOGGER.debug("Start GCS list request.");
final Page<Blob> blobs = GCS.list(
configs.getGcsLogBucket(),

final Page<Blob> blobs = gcsClient.list(
configs.getStorageConfigs().getGcsConfig().getBucketName(),
Storage.BlobListOption.prefix(logPath));

final var ascendingTimestampBlobs = new ArrayList<Blob>();
Expand Down Expand Up @@ -99,44 +108,21 @@ public List<String> tailCloudLog(final LogConfigs configs, final String logPath,
@Override
public void deleteLogs(final LogConfigs configs, final String logPath) {
LOGGER.debug("Retrieving logs from GCS path: {}", logPath);
createGcsClientIfNotExists(configs);
final Storage gcsClient = getOrCreateGcsClient();

LOGGER.debug("Start GCS list and delete request.");
final Page<Blob> blobs = GCS.list(configs.getGcsLogBucket(), Storage.BlobListOption.prefix(logPath));
final Page<Blob> blobs = gcsClient.list(configs.getStorageConfigs().getGcsConfig().getBucketName(), Storage.BlobListOption.prefix(logPath));
for (final Blob blob : blobs.iterateAll()) {
blob.delete(BlobSourceOption.generationMatch());
}
LOGGER.debug("Finished all deletes.");
}

private static void createGcsClientIfNotExists(final LogConfigs configs) {
private Storage getOrCreateGcsClient() {
if (GCS == null) {
Preconditions.checkNotNull(configs.getGcsLogBucket());
Preconditions.checkNotNull(configs.getGoogleApplicationCredentials());

GCS = StorageOptions.getDefaultInstance().getService();
}
}

public static void main(final String[] args) throws IOException {
final Storage storage = StorageOptions.getDefaultInstance().getService();
final var bucket = "davin-kube-logging-test";
final Page<Blob> blobs =
storage.list(
bucket,
Storage.BlobListOption.prefix("app-logging/workspace/server/logs"),
Storage.BlobListOption.pageSize(1));

final var randomName = Strings.addRandomSuffix("logs", "-", 5);
final var tmpOutputFile = new File("/tmp/" + randomName);
final var os = new FileOutputStream(tmpOutputFile);
for (final Blob blob : blobs.iterateAll()) {
System.out.println(blob.getName());
blob.downloadTo(os);
GCS = gcsClientFactory.get();
}
os.close();
final var data = new GcsLogs().tailCloudLog((new EnvConfigs()).getLogConfigs(), "tail", 6);
System.out.println(data);
return GCS;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,24 @@

package io.airbyte.config.helpers;

import io.airbyte.config.storage.CloudStorageConfigs;

/**
* Configuration required to retrieve logs. This is a subset of the methods defined in
* {@link io.airbyte.config.Configs} so actual look up can be delegated in {@link LogConfiguration}.
* This prevents conflicting configuration existing at once.
* Describes logging configuration. For now it just contains configuration around storage medium,
* but in the future will have other configuration options (e.g. json logging, etc).
*/
public interface LogConfigs {

String getS3LogBucket();

String getS3LogBucketRegion();

String getAwsAccessKey();
public class LogConfigs {

String getAwsSecretAccessKey();
public static LogConfigs EMPTY = new LogConfigs(null);

String getS3MinioEndpoint();
private final CloudStorageConfigs storageConfigs;

String getGcsLogBucket();
public LogConfigs(final CloudStorageConfigs storageConfigs) {
this.storageConfigs = storageConfigs;
}

String getGoogleApplicationCredentials();
public CloudStorageConfigs getStorageConfigs() {
return storageConfigs;
}

}

This file was deleted.

Loading

0 comments on commit bdda7e1

Please sign in to comment.