Skip to content

Commit

Permalink
Namespace optimization for hdfs data segments. (apache#3877)
Browse files Browse the repository at this point in the history
* NN optimization for hdfs data segments.

* HdfsDataSegmentKiller, HdfsDataSegment finder changes to use new storage
format.Docs update.

* Common utility function in DataSegmentPusherUtil.

* new static method `makeSegmentOutputPathUptoVersionForHdfs` in JobHelper

* reuse getHdfsStorageDirUptoVersion in
DataSegmentPusherUtil.getHdfsStorageDir()

* Addressed comments.

* Review comments.

* HdfsDataSegmentKiller requested changes.

* extra newline

* Add maprfs.
  • Loading branch information
akashdw authored and cheddar committed Mar 1, 2017
1 parent cc20133 commit 94da5e8
Show file tree
Hide file tree
Showing 17 changed files with 461 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@
public interface DataSegmentFinder
{
/**
* This method should first recursively look for descriptor.json underneath workingDirPath and then verify that
* index.zip exists in the same folder. If not, it should throw SegmentLoadingException to let the caller know that
* descriptor.json exists while index.zip doesn't. If a segment is found and updateDescriptor is set, then this method
* should update the loadSpec in descriptor.json to reflect the location from where it was found. After the search,
* this method should return the set of segments that were found.
* This method should first recursively look for descriptor.json (partitionNum_descriptor.json for HDFS data storage) underneath
* workingDirPath and then verify that index.zip (partitionNum_index.zip for HDFS data storage) exists in the same folder.
* If not, it should throw SegmentLoadingException to let the caller know that descriptor.json exists
* while index.zip doesn't. If a segment is found and updateDescriptor is set, then this method should update the
* loadSpec in descriptor.json to reflect the location from where it was found. After the search, this method
* should return the set of segments that were found.
*
* @param workingDirPath the String representation of the working directory path
* @param updateDescriptor if true, update loadSpec in descriptor.json if loadSpec's location is different from where
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ public static String getHdfsStorageDir(DataSegment segment)
segment.getInterval().getStart().toString(ISODateTimeFormat.basicDateTime()),
segment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime())
),
segment.getVersion().replaceAll(":", "_"),
segment.getShardSpec().getPartitionNum()
segment.getVersion().replaceAll(":", "_")
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,26 @@
public class DataSegmentPusherUtilTest
{
@Test
public void shouldNotHaveColonsInHdfsStorageDir() throws Exception {
public void shouldNotHaveColonsInHdfsStorageDir() throws Exception
{

Interval interval = new Interval("2011-10-01/2011-10-02");
ImmutableMap<String, Object> loadSpec = ImmutableMap.<String, Object>of("something", "or_other");

DataSegment segment = new DataSegment(
"something",
interval,
"brand:new:version",
loadSpec,
Arrays.asList("dim1", "dim2"),
Arrays.asList("met1", "met2"),
NoneShardSpec.instance(),
null,
1
"something",
interval,
"brand:new:version",
loadSpec,
Arrays.asList("dim1", "dim2"),
Arrays.asList("met1", "met2"),
NoneShardSpec.instance(),
null,
1
);

String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment);
Assert.assertEquals("something/20111001T000000.000Z_20111002T000000.000Z/brand_new_version/0", storageDir);
Assert.assertEquals("something/20111001T000000.000Z_20111002T000000.000Z/brand_new_version", storageDir);

}
}
4 changes: 2 additions & 2 deletions docs/content/development/modules.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ In this way, you can validate both push (at realtime node) and pull (at historic

* DataSegmentPusher

Wherever your data storage (cloud storage service, distributed file system, etc.) is, you should be able to see two new files: `descriptor.json` and `index.zip` after your ingestion task ends.
Wherever your data storage (cloud storage service, distributed file system, etc.) is, you should be able to see two new files: `descriptor.json` (`partitionNum_descriptor.json` for HDFS data storage) and `index.zip` (`partitionNum_index.zip` for HDFS data storage) after your ingestion task ends.

* DataSegmentPuller

Expand Down Expand Up @@ -118,7 +118,7 @@ To mark a segment as not used, you need to connect to your metadata storage and

To start a segment killing task, you need to access the old Coordinator console `http://<COODRINATOR_IP>:<COORDINATOR_PORT/old-console/kill.html` then select the appropriate datasource and then input a time range (e.g. `2000/3000`).

After the killing task ends, both `descriptor.json` and `index.zip` files should be deleted from the data storage.
After the killing task ends, both `descriptor.json` (`partitionNum_descriptor.json` for HDFS data storage) and `index.zip` (`partitionNum_index.zip` for HDFS data storage) files should be deleted from the data storage.

### Adding a new Firehose

Expand Down
28 changes: 12 additions & 16 deletions docs/content/operations/insert-segment-to-db.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ or runtime.properties file. Specifically, this tool needs to know
`druid.storage.type`

Besides the properties above, you also need to specify the location where the segments are stored and whether you want to
update descriptor.json. These two can be provided through command line arguments.
update descriptor.json (`partitionNum_descriptor.json` for HDFS data storage). These two can be provided through command line arguments.

`--workingDir` (Required)

Expand All @@ -36,11 +36,11 @@ update descriptor.json. These two can be provided through command line arguments

`--updateDescriptor` (Optional)

if set to true, this tool will update `loadSpec` field in `descriptor.json` if the path in `loadSpec` is different from
where `desciptor.json` was found. Default value is `true`.
if set to true, this tool will update `loadSpec` field in `descriptor.json` (`partitionNum_descriptor.json` for HDFS data storage) if the path in `loadSpec` is different from
where `desciptor.json` (`partitionNum_descriptor.json` for HDFS data storage) was found. Default value is `true`.

Note: you will also need to load different Druid extensions per the metadata and deep storage you use. For example, if you
use `mysql` as metadata storage and `HDFS` as deep storage, you should load `mysql-metadata-storage` and `druid-hdfs-storage`
use `mysql` as metadata storage and HDFS as deep storage, you should load `mysql-metadata-storage` and `druid-hdfs-storage`
extensions.


Expand All @@ -54,24 +54,20 @@ Directory path: /druid/storage/wikipedia
├── 2013-08-31T000000.000Z_2013-09-01T000000.000Z
│   └── 2015-10-21T22_07_57.074Z
│   └── 0
│   ├── descriptor.json
│   └── index.zip
│   ├── 0_descriptor.json
│   └── 0_index.zip
├── 2013-09-01T000000.000Z_2013-09-02T000000.000Z
│   └── 2015-10-21T22_07_57.074Z
│   └── 0
│   ├── descriptor.json
│   └── index.zip
│   ├── 0_descriptor.json
│   └── 0_index.zip
├── 2013-09-02T000000.000Z_2013-09-03T000000.000Z
│   └── 2015-10-21T22_07_57.074Z
│   └── 0
│   ├── descriptor.json
│   └── index.zip
│   ├── 0_descriptor.json
│   └── 0_index.zip
└── 2013-09-03T000000.000Z_2013-09-04T000000.000Z
└── 2015-10-21T22_07_57.074Z
└── 0
├── descriptor.json
└── index.zip
├── 0_descriptor.json
└── 0_index.zip
```

To load all these segments into `mysql`, you can fire the command below,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.druid.segment.loading.DataSegmentFinder;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
Expand Down Expand Up @@ -79,8 +80,16 @@ public Set<DataSegment> findSegments(String workingDirPathStr, boolean updateDes
while (it.hasNext()) {
final LocatedFileStatus locatedFileStatus = it.next();
final Path path = locatedFileStatus.getPath();
if (path.getName().equals("descriptor.json")) {
final Path indexZip = new Path(path.getParent(), "index.zip");
if (path.getName().endsWith("descriptor.json")) {
final Path indexZip;
final String descriptorParts[] = path.getName().split("_");
if (descriptorParts.length == 2
&& descriptorParts[1].equals("descriptor.json")
&& StringUtils.isNumeric(descriptorParts[0])) {
indexZip = new Path(path.getParent(), String.format("%s_index.zip", descriptorParts[0]));
} else {
indexZip = new Path(path.getParent(), "index.zip");
}
if (fs.exists(indexZip)) {
final DataSegment dataSegment = mapper.readValue(fs.open(path), DataSegment.class);
log.info("Found segment [%s] located at [%s]", dataSegment.getIdentifier(), indexZip);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
package io.druid.storage.hdfs;

import com.google.inject.Inject;

import io.druid.java.util.common.logger.Logger;
import com.metamx.emitter.EmittingLogger;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -33,7 +33,7 @@

public class HdfsDataSegmentKiller implements DataSegmentKiller
{
private static final Logger log = new Logger(HdfsDataSegmentKiller.class);
private static final EmittingLogger log = new EmittingLogger(HdfsDataSegmentKiller.class);

private static final String PATH_KEY = "path";

Expand All @@ -48,42 +48,71 @@ public HdfsDataSegmentKiller(final Configuration config, final HdfsDataSegmentPu
this.storageDirectory = new Path(pusherConfig.getStorageDirectory());
}

private static Path getPath(DataSegment segment)
{
return new Path(String.valueOf(segment.getLoadSpec().get(PATH_KEY)));
}

@Override
public void kill(DataSegment segment) throws SegmentLoadingException
{
final Path path = getPath(segment);
log.info("killing segment[%s] mapped to path[%s]", segment.getIdentifier(), path);
final Path segmentPath = getPath(segment);
log.info("killing segment[%s] mapped to path[%s]", segment.getIdentifier(), segmentPath);

try {
if (path.getName().endsWith(".zip")) {

final FileSystem fs = path.getFileSystem(config);

if (!fs.exists(path)) {
log.warn("Segment Path [%s] does not exist. It appears to have been deleted already.", path);
return ;
}
String segmentLocation = segmentPath.getName();
final FileSystem fs = segmentPath.getFileSystem(config);
if (!segmentLocation.endsWith(".zip")) {
throw new SegmentLoadingException("Unknown file type[%s]", segmentPath);
} else {

// path format -- > .../dataSource/interval/version/partitionNum/xxx.zip
Path partitionNumDir = path.getParent();
if (!fs.delete(partitionNumDir, true)) {
throw new SegmentLoadingException(
"Unable to kill segment, failed to delete dir [%s]",
partitionNumDir.toString()
);
if (!fs.exists(segmentPath)) {
log.warn("Segment Path [%s] does not exist. It appears to have been deleted already.", segmentPath);
return;
}

//try to delete other directories if possible
Path versionDir = partitionNumDir.getParent();
if (safeNonRecursiveDelete(fs, versionDir)) {
Path intervalDir = versionDir.getParent();
if (safeNonRecursiveDelete(fs, intervalDir)) {
Path dataSourceDir = intervalDir.getParent();
safeNonRecursiveDelete(fs, dataSourceDir);
String[] zipParts = segmentLocation.split("_");
// for segments stored as hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum_index.zip
if (zipParts.length == 2
&& zipParts[1].equals("index.zip")
&& StringUtils.isNumeric(zipParts[0])) {
if (!fs.delete(segmentPath, false)) {
throw new SegmentLoadingException(
"Unable to kill segment, failed to delete [%s]",
segmentPath.toString()
);
}
Path descriptorPath = new Path(segmentPath.getParent(), String.format("%s_descriptor.json", zipParts[0]));
//delete partitionNumber_descriptor.json
if (!fs.delete(descriptorPath, false)) {
throw new SegmentLoadingException(
"Unable to kill segment, failed to delete [%s]",
descriptorPath.toString()
);
}
//for segments stored as hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum_index.zip
// max depth to look is 2, i.e version directory and interval.
mayBeDeleteParentsUpto(fs, segmentPath, 2);

} else { //for segments stored as hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum/
// index.zip
if (!fs.delete(segmentPath, false)) {
throw new SegmentLoadingException(
"Unable to kill segment, failed to delete [%s]",
segmentPath.toString()
);
}
Path descriptorPath = new Path(segmentPath.getParent(), "descriptor.json");
if (!fs.delete(descriptorPath, false)) {
throw new SegmentLoadingException(
"Unable to kill segment, failed to delete [%s]",
descriptorPath.toString()
);
}
//for segments stored as hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum/index.zip
//max depth to look is 3, i.e partition number directory,version directory and interval.
mayBeDeleteParentsUpto(fs, segmentPath, 3);
}
} else {
throw new SegmentLoadingException("Unknown file type[%s]", path);
}
}
catch (IOException e) {
Expand All @@ -99,18 +128,19 @@ public void killAll() throws IOException
fs.delete(storageDirectory, true);
}

private boolean safeNonRecursiveDelete(FileSystem fs, Path path)
private void mayBeDeleteParentsUpto(final FileSystem fs, final Path segmentPath, final int maxDepthTobeDeleted)
{
Path path = segmentPath;
try {
return fs.delete(path, false);
for (int i = 1; i <= maxDepthTobeDeleted; i++) {
path = path.getParent();
if (fs.listStatus(path).length != 0 || !fs.delete(path, false)) {
break;
}
}
}
catch (Exception ex) {
return false;
catch (Exception e) {
log.makeAlert(e, "uncaught exception during segment killer").emit();
}
}

private Path getPath(DataSegment segment)
{
return new Path(String.valueOf(segment.getLoadSpec().get(PATH_KEY)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,11 @@ public DataSegment push(File inDir, DataSegment segment) throws IOException
);

Path tmpFile = new Path(String.format(
"%s/%s/%s/index.zip",
"%s/%s/%s/%s_index.zip",
fullyQualifiedStorageDirectory,
segment.getDataSource(),
UUIDUtils.generateUuid()
UUIDUtils.generateUuid(),
segment.getShardSpec().getPartitionNum()
));
FileSystem fs = tmpFile.getFileSystem(hadoopConfig);

Expand All @@ -108,18 +109,21 @@ public DataSegment push(File inDir, DataSegment segment) throws IOException
final DataSegment dataSegment;
try (FSDataOutputStream out = fs.create(tmpFile)) {
size = CompressionUtils.zip(inDir, out);
final Path outFile = new Path(String.format(
"%s/%s/index.zip",
Path outFile = new Path(String.format(
"%s/%s/%d_index.zip",
fullyQualifiedStorageDirectory,
storageDir
storageDir,
segment.getShardSpec().getPartitionNum()
));

final Path outDir = outFile.getParent();
dataSegment = createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(outFile))
.withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(inDir)),
tmpFile.getParent(),
fs
fs,
segment.getShardSpec().getPartitionNum()
);

// Create parent if it does not exist, recreation is not an error
Expand Down Expand Up @@ -154,9 +158,9 @@ public DataSegment push(File inDir, DataSegment segment) throws IOException
return dataSegment;
}

private DataSegment createDescriptorFile(DataSegment segment, Path outDir, final FileSystem fs) throws IOException
private DataSegment createDescriptorFile(DataSegment segment, Path outDir, final FileSystem fs, final int partitionNumber) throws IOException
{
final Path descriptorFile = new Path(outDir, "descriptor.json");
final Path descriptorFile = new Path(outDir, String.format("%s_descriptor.json", partitionNumber));
log.info("Creating descriptor file at[%s]", descriptorFile);
ByteSource
.wrap(jsonMapper.writeValueAsBytes(segment))
Expand Down
Loading

0 comments on commit 94da5e8

Please sign in to comment.