Skip to content

Commit

Permalink
Fix S3 deep storage push and s3 insert-segment-to-db (apache#4174)
Browse files Browse the repository at this point in the history
* Fix S3 deep storage push and s3 insert-segment-to-db

* Less verbose checks in S3DataSegmentFinder
  • Loading branch information
jerchung authored and drcrallen committed Apr 15, 2017
1 parent 8432138 commit 0bcfd93
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,12 @@ public Set<DataSegment> findSegments(String workingDirPath, boolean updateDescri
log.info("Found segment [%s] located at [%s]", dataSegment.getIdentifier(), indexZip);

final Map<String, Object> loadSpec = dataSegment.getLoadSpec();
if (!loadSpec.get("type").equals(S3StorageDruidModule.SCHEME) || !loadSpec.get("key").equals(indexZip)) {
if (!S3StorageDruidModule.SCHEME.equals(loadSpec.get("type")) ||
!indexZip.equals(loadSpec.get("key")) ||
!config.getBucket().equals(loadSpec.get("bucket"))) {
loadSpec.put("type", S3StorageDruidModule.SCHEME);
loadSpec.put("key", indexZip);
loadSpec.put("bucket", config.getBucket());
if (updateDescriptor) {
log.info(
"Updating loadSpec in descriptor.json at [%s] with new path [%s]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public DataSegment call() throws Exception
.withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));

File descriptorFile = File.createTempFile("druid", "descriptor.json");
Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(inSegment)), descriptorFile);
Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(outSegment)), descriptorFile);
S3Object descriptorObject = new S3Object(descriptorFile);
descriptorObject.setBucketName(outputBucket);
descriptorObject.setKey(s3DescriptorPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,45 @@ public void testFindSegmentsWithworkingDirPath() throws SegmentLoadingException
Assert.assertEquals(5, segments.size());
}

@Test
public void testFindSegmentsUpdateLoadSpec() throws Exception {
config.setBucket("amazing");
final DataSegment segmentMissingLoadSpec = DataSegment.builder(SEGMENT_1)
.loadSpec(ImmutableMap.of())
.build();
final S3DataSegmentFinder s3DataSegmentFinder = new S3DataSegmentFinder(mockS3Client, config, mapper);
final String segmentPath = baseKey + "/interval_missing_load_spec/v1/1/";
final String descriptorPath = S3Utils.descriptorPathForSegmentPath(segmentPath);
final String indexPath = S3Utils.indexZipForSegmentPath(segmentPath);

mockS3Client.putObject(
config.getBucket(),
new S3Object(
descriptorPath,
mapper.writeValueAsString(segmentMissingLoadSpec)
)
);

mockS3Client.putObject(
config.getBucket(),
new S3Object(
indexPath,
"dummy"
)
);

Set<DataSegment> segments = s3DataSegmentFinder.findSegments(segmentPath, false);
Assert.assertEquals(1, segments.size());

// Guaranteed there's only 1 element due to prior assert
DataSegment testSegment = segments.iterator().next();
Map<String, Object> testLoadSpec = testSegment.getLoadSpec();

Assert.assertEquals("amazing", testLoadSpec.get("bucket"));
Assert.assertEquals("s3_zip", testLoadSpec.get("type"));
Assert.assertEquals(indexPath, testLoadSpec.get("key"));
}

private String getDescriptorPath(DataSegment segment)
{
return S3Utils.descriptorPathForSegmentPath(String.valueOf(segment.getLoadSpec().get("key")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@

package io.druid.storage.s3;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.commons.io.IOUtils;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import org.joda.time.Interval;
Expand All @@ -40,15 +44,40 @@
*/
public class S3DataSegmentPusherTest
{
private class ValueContainer<T> {
private T value;

public T getValue() {
return value;
}

public void setValue(T value) {
this.value = value;
}
}

@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();

@Test
public void testPush() throws Exception
{
RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class);
EasyMock.expect(s3Client.putObject(EasyMock.anyString(), EasyMock.<S3Object>anyObject()))
.andReturn(null)

Capture<S3Object> capturedS3Object = Capture.newInstance();
ValueContainer<String> capturedS3SegmentJson = new ValueContainer<>();
EasyMock.expect(s3Client.putObject(EasyMock.anyString(), EasyMock.capture(capturedS3Object)))
.andAnswer(
new IAnswer<S3Object>() {
@Override
public S3Object answer() throws Throwable {
capturedS3SegmentJson.setValue(
IOUtils.toString(capturedS3Object.getValue().getDataInputStream(), "utf-8")
);
return null;
}
}
)
.atLeastOnce();
EasyMock.replay(s3Client);

Expand All @@ -57,7 +86,8 @@ public void testPush() throws Exception
config.setBucket("bucket");
config.setBaseKey("key");

S3DataSegmentPusher pusher = new S3DataSegmentPusher(s3Client, config, new DefaultObjectMapper());
ObjectMapper objectMapper = new DefaultObjectMapper();
S3DataSegmentPusher pusher = new S3DataSegmentPusher(s3Client, config, objectMapper);

// Create a mock segment on disk
File tmp = tempFolder.newFile("version.bin");
Expand All @@ -81,6 +111,16 @@ public void testPush() throws Exception
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush);

Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
Assert.assertEquals(1, (int) segment.getBinaryVersion());
Assert.assertEquals("bucket", segment.getLoadSpec().get("bucket"));
Assert.assertEquals(
"key/foo/2015-01-01T00:00:00.000Z_2016-01-01T00:00:00.000Z/0/0/index.zip",
segment.getLoadSpec().get("key"));
Assert.assertEquals("s3_zip", segment.getLoadSpec().get("type"));

// Verify that the pushed S3Object contains the correct data
String segmentJson = objectMapper.writeValueAsString(segment);
Assert.assertEquals(segmentJson, capturedS3SegmentJson.getValue());

EasyMock.verify(s3Client);
}
Expand Down

0 comments on commit 0bcfd93

Please sign in to comment.