Skip to content

Commit

Permalink
Support finding segments in AWS S3. (apache#3399)
Browse files Browse the repository at this point in the history
* support finding segments from a AWS S3 storage.

* add more Uts

* address comments and add a document for the feature.

* update docs indentation

* update docs indentation

* address comments.
1. add a Ut for json ser/deser for the config object.
2. more informant error message in a Ut.

* address comments.
1. use @min to validate the configuration object
2. change updateDescriptor to a string as it does not take an argument otherwise

* fix a Ut failure - delete a Ut for testing default max length.
  • Loading branch information
jaehc authored and fjy committed Oct 11, 2016
1 parent 1e79a1b commit 6f21778
Show file tree
Hide file tree
Showing 9 changed files with 771 additions and 11 deletions.
24 changes: 24 additions & 0 deletions docs/content/operations/insert-segment-to-db.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,27 @@ and `druid-hdfs-storage` in the extension list.

After running this command, the segments table in `mysql` should store the new location for each segment we just inserted.
Note that for segments stored in HDFS, druid config must contain core-site.xml as described in [Druid Docs](http://druid.io/docs/latest/tutorials/cluster.html), as this new location is stored with relative path.

It is also possible to use `s3` as deep storage. In order to work with it, specify `s3` as deep storage type and load
[`druid-s3-extensions`](../development/extensions-core/s3.html) as an extension.

```
java
-Ddruid.metadata.storage.type=mysql
-Ddruid.metadata.storage.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
-Ddruid.metadata.storage.connector.user=druid
-Ddruid.metadata.storage.connector.password=diurd
-Ddruid.extensions.loadList=[\"mysql-metadata-storage\",\"druid-s3-extensions\"]
-Ddruid.storage.type=s3
-Ddruid.s3.accessKey=...
-Ddruid.s3.secretKey=...
-Ddruid.storage.bucket=your-bucket
-Ddruid.storage.baseKey=druid/storage/wikipedia
-Ddruid.storage.maxListingLength=1000
-cp $DRUID_CLASSPATH
io.druid.cli.Main tools insert-segment-to-db --workingDir "druid/storage/wikipedia" --updateDescriptor true
```

Note that you can provide the location of segments with either `druid.storage.baseKey` or `--workingDir`. If both are
specified, `--workingDir` gets higher priority. `druid.storage.maxListingLength` is to determine the length of a
partial list in requesting a object listing to `s3`, which defaults to 1000.
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.storage.s3;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.segment.loading.DataSegmentFinder;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import org.jets3t.service.model.StorageObject;

import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

public class S3DataSegmentFinder implements DataSegmentFinder
{
private static final Logger log = new Logger(S3DataSegmentFinder.class);

private final RestS3Service s3Client;
private final ObjectMapper jsonMapper;
private final S3DataSegmentPusherConfig config;

@Inject
public S3DataSegmentFinder(
RestS3Service s3Client,
S3DataSegmentPusherConfig config,
ObjectMapper jsonMapper
)
{
this.s3Client = s3Client;
this.config = config;
this.jsonMapper = jsonMapper;
}

@Override
public Set<DataSegment> findSegments(String workingDirPath, boolean updateDescriptor) throws SegmentLoadingException
{
final Set<DataSegment> segments = Sets.newHashSet();

try {
Iterator<StorageObject> objectsIterator = S3Utils.storageObjectsIterator(
s3Client,
config.getBucket(),
workingDirPath.length() == 0 ? config.getBaseKey() : workingDirPath,
config.getMaxListingLength());

while(objectsIterator.hasNext()) {
StorageObject storageObject = objectsIterator.next();
storageObject.closeDataInputStream();

if (S3Utils.toFilename(storageObject.getKey()).equals("descriptor.json")) {
final String descriptorJson = storageObject.getKey();
String indexZip = S3Utils.indexZipForSegmentPath(descriptorJson);

if (S3Utils.isObjectInBucket(s3Client, config.getBucket(), indexZip)) {
S3Object indexObject = s3Client.getObject(config.getBucket(), descriptorJson);

try (InputStream is = indexObject.getDataInputStream()) {
final DataSegment dataSegment = jsonMapper.readValue(is, DataSegment.class);
log.info("Found segment [%s] located at [%s]", dataSegment.getIdentifier(), indexZip);

final Map<String, Object> loadSpec = dataSegment.getLoadSpec();
if (!loadSpec.get("type").equals(S3StorageDruidModule.SCHEME) || !loadSpec.get("key").equals(indexZip)) {
loadSpec.put("type", S3StorageDruidModule.SCHEME);
loadSpec.put("key", indexZip);
if (updateDescriptor) {
log.info(
"Updating loadSpec in descriptor.json at [%s] with new path [%s]",
descriptorJson,
indexObject
);
S3Object newDescJsonObject = new S3Object(descriptorJson, jsonMapper.writeValueAsString(dataSegment));
s3Client.putObject(config.getBucket(), newDescJsonObject);
}
}
segments.add(dataSegment);
}
} else {
throw new SegmentLoadingException(
"index.zip didn't exist at [%s] while descriptor.json exists!?",
indexZip
);
}
}
}
} catch (ServiceException e) {
throw new SegmentLoadingException(e, "Problem interacting with S3");
} catch (IOException e) {
throw new SegmentLoadingException(e, "IO exception");
} catch (Exception e) {
Throwables.propagateIfInstanceOf(e, SegmentLoadingException.class);
Throwables.propagate(e);
}
return segments;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -313,13 +313,6 @@ public String getVersion(URI uri) throws IOException
}
}

private String toFilename(String key, final String suffix)
{
String filename = key.substring(key.lastIndexOf("/") + 1); // characters after last '/'
filename = filename.substring(0, filename.length() - suffix.length()); // remove the suffix from the end
return filename;
}

private boolean isObjectInBucket(final S3Coords coords) throws SegmentLoadingException
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.fasterxml.jackson.annotation.JsonProperty;

import javax.validation.constraints.Min;

/**
*/
public class S3DataSegmentPusherConfig
Expand All @@ -34,6 +36,10 @@ public class S3DataSegmentPusherConfig
@JsonProperty
private boolean disableAcl = false;

@JsonProperty
@Min(0)
private int maxListingLength = 1000;

public void setBucket(String bucket)
{
this.bucket = bucket;
Expand All @@ -49,6 +55,11 @@ public void setDisableAcl(boolean disableAcl)
this.disableAcl = disableAcl;
}

public void setMaxListingLength(int maxListingLength)
{
this.maxListingLength = maxListingLength;
}

public String getBucket()
{
return bucket;
Expand All @@ -63,4 +74,9 @@ public boolean getDisableAcl()
{
return disableAcl;
}

public int getMaxListingLength()
{
return maxListingLength;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public void configure(Binder binder)
Binders.dataSegmentMoverBinder(binder).addBinding(SCHEME).to(S3DataSegmentMover.class).in(LazySingleton.class);
Binders.dataSegmentArchiverBinder(binder).addBinding(SCHEME).to(S3DataSegmentArchiver.class).in(LazySingleton.class);
Binders.dataSegmentPusherBinder(binder).addBinding("s3").to(S3DataSegmentPusher.class).in(LazySingleton.class);
Binders.dataSegmentFinderBinder(binder).addBinding("s3").to(S3DataSegmentFinder.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class);
JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentArchiverConfig.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,19 @@

import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.metamx.common.RetryUtils;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import java.io.IOException;
import java.util.concurrent.Callable;
import org.jets3t.service.ServiceException;
import org.jets3t.service.StorageObjectsChunk;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import org.jets3t.service.model.StorageObject;

import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.Callable;

/**
*
Expand Down Expand Up @@ -107,6 +112,79 @@ public static boolean isObjectInBucket(RestS3Service s3Client, String bucketName
return true;
}

public static Iterator<StorageObject> storageObjectsIterator(
final RestS3Service s3Client,
final String bucket,
final String prefix,
final long maxListingLength
)
{
return new Iterator<StorageObject>()
{
private StorageObjectsChunk objectsChunk;
private int objectsChunkOffset;

@Override
public boolean hasNext()
{
if (objectsChunk == null) {
objectsChunk = listObjectsChunkedAfter("");
objectsChunkOffset = 0;
}

if (objectsChunk.getObjects().length <= objectsChunkOffset) {
if (objectsChunk.isListingComplete()) {
return false;
} else {
objectsChunk = listObjectsChunkedAfter(objectsChunk.getPriorLastKey());
objectsChunkOffset = 0;
}
}

return true;
}

private StorageObjectsChunk listObjectsChunkedAfter(final String priorLastKey)
{
try {
return retryS3Operation(
new Callable<StorageObjectsChunk>()
{
@Override
public StorageObjectsChunk call() throws Exception
{
return s3Client.listObjectsChunked(
bucket, prefix, null, maxListingLength, priorLastKey);
}
}
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}

@Override
public StorageObject next()
{
if (!hasNext()) {
throw new IllegalStateException();
}
StorageObject storageObject = objectsChunk.getObjects()[objectsChunkOffset];
objectsChunkOffset++;

return storageObject;
}

@Override
public void remove()
{
throw new UnsupportedOperationException();
}


};
}

public static String constructSegmentPath(String baseKey, DataSegment segment)
{
Expand All @@ -120,4 +198,21 @@ public static String descriptorPathForSegmentPath(String s3Path)
{
return s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json";
}

public static String indexZipForSegmentPath(String s3Path)
{
return s3Path.substring(0, s3Path.lastIndexOf("/")) + "/index.zip";
}

public static String toFilename(String key)
{
return toFilename(key, "");
}

public static String toFilename(String key, final String suffix)
{
String filename = key.substring(key.lastIndexOf("/") + 1); // characters after last '/'
filename = filename.substring(0, filename.length() - suffix.length()); // remove the suffix from the end
return filename;
}
}
Loading

0 comments on commit 6f21778

Please sign in to comment.