Skip to content

Commit

Permalink
Lazy instantiation for segmentKillers, segmentMovers, and segmentArch…
Browse files Browse the repository at this point in the history
…ivers (apache#12207)

* working

* Lazily load segmentKillers, segmentMovers, and segmentArchivers

* more tests

* test-jar plugin

* more coverage

* lazy client

* clean up changes

* checkstyle

* i did not change the branch condition

* adjust failure rate to run tests faster

* javadocs

* checkstyle
  • Loading branch information
jihoonson authored Feb 8, 2022
1 parent 4add251 commit ab3d994
Show file tree
Hide file tree
Showing 39 changed files with 864 additions and 91 deletions.
15 changes: 15 additions & 0 deletions cloud/gcp-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,19 @@
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@

import javax.annotation.Nullable;

/**
* DataSegmentArchiver knows how to archive segments.
* Since any implementation of DataSegmentArchiver is initialized when an ingestion job starts
* if a deep storage extension is loaded even when that deep storage is actually not used,
* implementations should avoid initializing the deep storage client immediately
* but defer it until the deep storage client is actually used.
*/
@ExtensionPoint
public interface DataSegmentArchiver
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@

import java.io.IOException;

/**
* DataSegmentKiller knows how to kill segments from the Druid system.
* Since any implementation of DataSegmentKiller is initialized when an ingestion job starts
* if a deep storage extension is loaded even when that deep storage is actually not used,
* implementations should avoid initializing the deep storage client immediately
* but defer it until the deep storage client is actually used.
*/
@ExtensionPoint
public interface DataSegmentKiller
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@

import java.util.Map;

/**
* DataSegmentMover knows how to move the segment location from one to another.
* Since any implementation of DataSegmentMover is initialized when an ingestion job starts
* if a deep storage extension is loaded even when that deep storage is actually not used,
* implementations should avoid initializing the deep storage client immediately
* but defer it until the deep storage client is actually used.
*/
@ExtensionPoint
public interface DataSegmentMover
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.aliyun.oss.OSS;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Objects;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.guice.annotations.Json;
Expand All @@ -39,7 +40,7 @@ public class OssDataSegmentArchiver extends OssDataSegmentMover implements DataS
@Inject
public OssDataSegmentArchiver(
@Json ObjectMapper mapper,
OSS client,
Supplier<OSS> client,
OssDataSegmentArchiverConfig archiveConfig,
OssStorageConfig restoreConfig
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSException;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.MapUtils;
Expand All @@ -37,18 +38,26 @@ public class OssDataSegmentKiller implements DataSegmentKiller
{
private static final Logger log = new Logger(OssDataSegmentKiller.class);

private final OSS client;
/**
* Any implementation of DataSegmentKiller is initialized when an ingestion job starts if the extension is loaded,
* even when the implementation of DataSegmentKiller is not used. As a result, if we have an OSS client instead
* of a supplier of it, it can cause unnecessary config validation for OSS even when it's not used at all.
* To perform the config validation only when it is actually used, we use a supplier.
*
* See OmniDataSegmentKiller for how DataSegmentKillers are initialized.
*/
private final Supplier<OSS> clientSupplier;
private final OssStorageConfig segmentPusherConfig;
private final OssInputDataConfig inputDataConfig;

@Inject
public OssDataSegmentKiller(
OSS client,
Supplier<OSS> clientSupplier,
OssStorageConfig segmentPusherConfig,
OssInputDataConfig inputDataConfig
)
{
this.client = client;
this.clientSupplier = clientSupplier;
this.segmentPusherConfig = segmentPusherConfig;
this.inputDataConfig = inputDataConfig;
}
Expand All @@ -61,6 +70,7 @@ public void kill(DataSegment segment) throws SegmentLoadingException
String bucket = MapUtils.getString(loadSpec, "bucket");
String path = MapUtils.getString(loadSpec, "key");

final OSS client = this.clientSupplier.get();
if (client.doesObjectExist(bucket, path)) {
log.info("Removing index file[%s://%s/%s] from aliyun OSS!", OssStorageDruidModule.SCHEME, bucket, path);
client.deleteObject(bucket, path);
Expand All @@ -83,7 +93,7 @@ public void killAll() throws IOException
);
try {
OssUtils.deleteObjectsInPath(
client,
clientSupplier.get(),
inputDataConfig,
segmentPusherConfig.getBucket(),
segmentPusherConfig.getPrefix(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.aliyun.oss.model.ObjectListing;
import com.aliyun.oss.model.StorageClass;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
Expand All @@ -49,16 +50,24 @@ public class OssDataSegmentMover implements DataSegmentMover
{
private static final Logger log = new Logger(OssDataSegmentMover.class);

private final OSS client;
/**
* Any implementation of DataSegmentMover is initialized when an ingestion job starts if the extension is loaded
* even when the implementation of DataSegmentMover is not used. As a result, if we accept an OSS client instead
* of a supplier of it, it can cause unnecessary config validation for OSS even when it's not used at all.
* To perform the config validation only when it is actually used, we use a supplier.
*
* See OmniDataSegmentMover for how DataSegmentMovers are initialized.
*/
private final Supplier<OSS> clientSupplier;
private final OssStorageConfig config;

@Inject
public OssDataSegmentMover(
OSS client,
Supplier<OSS> client,
OssStorageConfig config
)
{
this.client = client;
this.clientSupplier = client;
this.config = config;
}

Expand Down Expand Up @@ -167,6 +176,7 @@ private void selfCheckingMove(
log.info("No need to move file[%s://%s/%s] onto itself", OssStorageDruidModule.SCHEME, srcBucket, srcPath);
return;
}
final OSS client = this.clientSupplier.get();
if (client.doesObjectExist(srcBucket, srcPath)) {
final ObjectListing listResult = client.listObjects(
new ListObjectsRequest(srcBucket, srcPath, null, null, 1)
Expand Down Expand Up @@ -238,7 +248,7 @@ private void deleteWithRetries(final String bucket, final String path) throws Ex
RetryUtils.retry(
() -> {
try {
client.deleteObject(bucket, path);
clientSupplier.get().deleteObject(bucket, path);
return null;
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.aliyun.oss.OSS;
import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.Module;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Provides;
Expand Down Expand Up @@ -97,10 +99,26 @@ public void configure(Binder binder)
binder.bind(OssTaskLogs.class).in(LazySingleton.class);
}

/**
* Creates {@link OSS} which may perform config validation immediately.
* You may want to avoid immediate config validation but defer it until you actually use the OSS client.
* Use {@link #initializeOssClientSupplier} instead in that case.
*/
@Provides
@LazySingleton
public OSS initializeOssClient(OssClientConfig inputSourceConfig)
{
return inputSourceConfig.buildClient();
}

/**
* Creates a supplier that lazily initialize {@link OSS}.
* You may want to use the supplier to defer config validation until you actually use the OSS client.
*/
@Provides
@LazySingleton
public Supplier<OSS> initializeOssClientSupplier(OssClientConfig inputSourceConfig)
{
return Suppliers.memoize(() -> initializeOssClient(inputSourceConfig));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.jackson.DefaultObjectMapper;
Expand Down Expand Up @@ -72,8 +74,8 @@ public String getArchiveBaseKey()
}
};
private static final OssStorageConfig PUSHER_CONFIG = new OssStorageConfig();
private static final OSS OSS_CLIENT = EasyMock.createStrictMock(OSSClient.class);
private static final OssDataSegmentPuller PULLER = new OssDataSegmentPuller(OSS_CLIENT);
private static final Supplier<OSS> OSS_CLIENT = Suppliers.ofInstance(EasyMock.createStrictMock(OSSClient.class));
private static final OssDataSegmentPuller PULLER = new OssDataSegmentPuller(OSS_CLIENT.get());
private static final DataSegment SOURCE_SEGMENT = DataSegment
.builder()
.binaryVersion(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.aliyun.oss.OSS;
import com.aliyun.oss.model.DeleteObjectsRequest;
import com.aliyun.oss.model.OSSObjectSummary;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.ISE;
Expand Down Expand Up @@ -76,7 +77,7 @@ public void test_killAll_accountConfigWithNullBucketAndBaseKey_throwsISEExceptio

EasyMock.replay(client, segmentPusherConfig, inputDataConfig);

segmentKiller = new OssDataSegmentKiller(client, segmentPusherConfig, inputDataConfig);
segmentKiller = new OssDataSegmentKiller(Suppliers.ofInstance(client), segmentPusherConfig, inputDataConfig);
segmentKiller.killAll();
}
catch (ISE e) {
Expand Down Expand Up @@ -119,7 +120,7 @@ public void test_killAll_noException_deletesAllSegments() throws IOException

EasyMock.replay(client, segmentPusherConfig, inputDataConfig);

segmentKiller = new OssDataSegmentKiller(client, segmentPusherConfig, inputDataConfig);
segmentKiller = new OssDataSegmentKiller(Suppliers.ofInstance(client), segmentPusherConfig, inputDataConfig);
segmentKiller.killAll();
EasyMock.verify(client, segmentPusherConfig, inputDataConfig);
}
Expand Down Expand Up @@ -154,7 +155,7 @@ public void test_killAll_recoverableExceptionWhenListingObjects_deletesAllSegmen

EasyMock.replay(client, segmentPusherConfig, inputDataConfig);

segmentKiller = new OssDataSegmentKiller(client, segmentPusherConfig, inputDataConfig);
segmentKiller = new OssDataSegmentKiller(Suppliers.ofInstance(client), segmentPusherConfig, inputDataConfig);
segmentKiller.killAll();
EasyMock.verify(client, segmentPusherConfig, inputDataConfig);
}
Expand Down Expand Up @@ -192,7 +193,7 @@ public void test_killAll_nonrecoverableExceptionWhenListingObjects_deletesAllSeg

EasyMock.replay(client, segmentPusherConfig, inputDataConfig);

segmentKiller = new OssDataSegmentKiller(client, segmentPusherConfig, inputDataConfig);
segmentKiller = new OssDataSegmentKiller(Suppliers.ofInstance(client), segmentPusherConfig, inputDataConfig);
segmentKiller.killAll();
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.aliyun.oss.model.ObjectListing;
import com.aliyun.oss.model.PutObjectResult;
import com.aliyun.oss.model.StorageClass;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.Intervals;
Expand Down Expand Up @@ -67,7 +68,7 @@ public class OssDataSegmentMoverTest
public void testMove() throws Exception
{
MockClient mockClient = new MockClient();
OssDataSegmentMover mover = new OssDataSegmentMover(mockClient, new OssStorageConfig());
OssDataSegmentMover mover = new OssDataSegmentMover(Suppliers.ofInstance(mockClient), new OssStorageConfig());

mockClient.putObject(
"main",
Expand All @@ -92,7 +93,7 @@ public void testMove() throws Exception
public void testMoveNoop() throws Exception
{
MockClient mockOssClient = new MockClient();
OssDataSegmentMover mover = new OssDataSegmentMover(mockOssClient, new OssStorageConfig());
OssDataSegmentMover mover = new OssDataSegmentMover(Suppliers.ofInstance(mockOssClient), new OssStorageConfig());

mockOssClient.putObject(
"archive",
Expand All @@ -118,7 +119,7 @@ public void testMoveNoop() throws Exception
public void testMoveException() throws Exception
{
MockClient mockClient = new MockClient();
OssDataSegmentMover mover = new OssDataSegmentMover(mockClient, new OssStorageConfig());
OssDataSegmentMover mover = new OssDataSegmentMover(Suppliers.ofInstance(mockClient), new OssStorageConfig());

mover.move(
SOURCE_SEGMENT,
Expand All @@ -130,7 +131,7 @@ public void testMoveException() throws Exception
public void testIgnoresGoneButAlreadyMoved() throws Exception
{
MockClient mockOssClient = new MockClient();
OssDataSegmentMover mover = new OssDataSegmentMover(mockOssClient, new OssStorageConfig());
OssDataSegmentMover mover = new OssDataSegmentMover(Suppliers.ofInstance(mockOssClient), new OssStorageConfig());
mover.move(new DataSegment(
"test",
Intervals.of("2013-01-01/2013-01-02"),
Expand All @@ -153,7 +154,7 @@ public void testIgnoresGoneButAlreadyMoved() throws Exception
public void testFailsToMoveMissing() throws Exception
{
MockClient client = new MockClient();
OssDataSegmentMover mover = new OssDataSegmentMover(client, new OssStorageConfig());
OssDataSegmentMover mover = new OssDataSegmentMover(Suppliers.ofInstance(client), new OssStorageConfig());
mover.move(new DataSegment(
"test",
Intervals.of("2013-01-01/2013-01-02"),
Expand Down
Loading

0 comments on commit ab3d994

Please sign in to comment.