Skip to content

Commit

Permalink
HdfsDataSegmentPusher bug fix (apache#4003)
Browse files Browse the repository at this point in the history
* Fix for HdfsDataSegmentPusher.

* Add missing loadspec in actual descriptor file. Tests to check actual content of descriptor file.
  • Loading branch information
akashdw authored and gianm committed Mar 6, 2017
1 parent df623eb commit bebf9f3
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,79 +93,87 @@ public DataSegment push(File inDir, DataSegment segment) throws IOException
storageDir
);

Path tmpFile = new Path(String.format(
Path tmpIndexFile = new Path(String.format(
"%s/%s/%s/%s_index.zip",
fullyQualifiedStorageDirectory,
segment.getDataSource(),
UUIDUtils.generateUuid(),
segment.getShardSpec().getPartitionNum()
));
FileSystem fs = tmpFile.getFileSystem(hadoopConfig);
FileSystem fs = tmpIndexFile.getFileSystem(hadoopConfig);

fs.mkdirs(tmpFile.getParent());
log.info("Compressing files from[%s] to [%s]", inDir, tmpFile);
fs.mkdirs(tmpIndexFile.getParent());
log.info("Compressing files from[%s] to [%s]", inDir, tmpIndexFile);

final long size;
final DataSegment dataSegment;
try (FSDataOutputStream out = fs.create(tmpFile)) {
try (FSDataOutputStream out = fs.create(tmpIndexFile)) {
size = CompressionUtils.zip(inDir, out);
Path outFile = new Path(String.format(
final Path outIndexFile = new Path(String.format(
"%s/%s/%d_index.zip",
fullyQualifiedStorageDirectory,
storageDir,
segment.getShardSpec().getPartitionNum()
));

final Path outDir = outFile.getParent();
dataSegment = createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(outFile))
.withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(inDir)),
tmpFile.getParent(),
fs,
final Path outDescriptorFile = new Path(String.format(
"%s/%s/%d_descriptor.json",
fullyQualifiedStorageDirectory,
storageDir,
segment.getShardSpec().getPartitionNum()
));

dataSegment = segment.withLoadSpec(makeLoadSpec(outIndexFile))
.withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(inDir));

final Path tmpDescriptorFile = new Path(
tmpIndexFile.getParent(),
String.format("%s_descriptor.json", dataSegment.getShardSpec().getPartitionNum())
);

log.info("Creating descriptor file at[%s]", tmpDescriptorFile);
ByteSource
.wrap(jsonMapper.writeValueAsBytes(dataSegment))
.copyTo(new HdfsOutputStreamSupplier(fs, tmpDescriptorFile));

// Create parent if it does not exist, recreation is not an error
fs.mkdirs(outDir.getParent());
if (!HadoopFsWrapper.rename(fs, tmpFile.getParent(), outDir)) {
if (fs.exists(outDir)) {
log.info(
"Unable to rename temp directory[%s] to segment directory[%s]. It is already pushed by a replica task.",
tmpFile.getParent(),
outDir
);
} else {
throw new IOException(String.format(
"Failed to rename temp directory[%s] and segment directory[%s] is not present.",
tmpFile.getParent(),
outDir
));
}
}
fs.mkdirs(outIndexFile.getParent());
copyFilesWithChecks(fs, tmpDescriptorFile, outDescriptorFile);
copyFilesWithChecks(fs, tmpIndexFile, outIndexFile);
}
finally {
try {
if (fs.exists(tmpFile.getParent()) && !fs.delete(tmpFile.getParent(), true)) {
log.error("Failed to delete temp directory[%s]", tmpFile.getParent());
if (fs.exists(tmpIndexFile.getParent()) && !fs.delete(tmpIndexFile.getParent(), true)) {
log.error("Failed to delete temp directory[%s]", tmpIndexFile.getParent());
}
}
catch (IOException ex) {
log.error(ex, "Failed to delete temp directory[%s]", tmpFile.getParent());
log.error(ex, "Failed to delete temp directory[%s]", tmpIndexFile.getParent());
}
}

return dataSegment;
}

private DataSegment createDescriptorFile(DataSegment segment, Path outDir, final FileSystem fs, final int partitionNumber) throws IOException
private void copyFilesWithChecks(final FileSystem fs, final Path from, final Path to) throws IOException
{
final Path descriptorFile = new Path(outDir, String.format("%s_descriptor.json", partitionNumber));
log.info("Creating descriptor file at[%s]", descriptorFile);
ByteSource
.wrap(jsonMapper.writeValueAsBytes(segment))
.copyTo(new HdfsOutputStreamSupplier(fs, descriptorFile));
return segment;
if (!HadoopFsWrapper.rename(fs, from, to)) {
if (fs.exists(to)) {
log.info(
"Unable to rename temp Index file[%s] to final segment path [%s]. "
+ "It is already pushed by a replica task.",
from,
to
);
} else {
throw new IOException(String.format(
"Failed to rename temp Index file[%s] and final segment path[%s] is not present.",
from,
to
));
}
}
}

private ImmutableMap<String, Object> makeLoadSpec(Path outFile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@

package io.druid.storage.hdfs;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand All @@ -27,6 +38,7 @@
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.NumberedShardSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -50,6 +62,8 @@ public class HdfsDataSegmentPusherTest
@Rule
public final ExpectedException expectedException = ExpectedException.none();

TestObjectMapper objectMapper = new TestObjectMapper();

@Test
public void testPushWithScheme() throws Exception
{
Expand All @@ -73,6 +87,12 @@ public void testPushWithoutScheme() throws Exception
testUsingScheme(null);
}

@Test
public void testPushWithMultipleSegments() throws Exception
{
testUsingSchemeForMultipleSegments("file", 3);
}

private void testUsingScheme(final String scheme) throws Exception
{
Configuration conf = new Configuration(true);
Expand Down Expand Up @@ -153,4 +173,148 @@ private void testUsingScheme(final String scheme) throws Exception
Assert.fail("should not throw exception");
}
}

private void testUsingSchemeForMultipleSegments(final String scheme, final int numberOfSegments) throws Exception
{
Configuration conf = new Configuration(true);
DataSegment[] segments = new DataSegment[numberOfSegments];

// Create a mock segment on disk
File segmentDir = tempFolder.newFolder();
File tmp = new File(segmentDir, "version.bin");

final byte[] data = new byte[]{0x0, 0x0, 0x0, 0x1};
Files.write(data, tmp);
final long size = data.length;

HdfsDataSegmentPusherConfig config = new HdfsDataSegmentPusherConfig();
final File storageDirectory = tempFolder.newFolder();

config.setStorageDirectory(
scheme != null
? String.format("%s://%s", scheme, storageDirectory.getAbsolutePath())
: storageDirectory.getAbsolutePath()
);
HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new DefaultObjectMapper());

for (int i = 0; i < numberOfSegments; i++) {
segments[i] = new DataSegment(
"foo",
new Interval("2015/2016"),
"0",
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NumberedShardSpec(i, i),
0,
size
);
}

for (int i = 0; i < numberOfSegments; i++) {
final DataSegment pushedSegment = pusher.push(segmentDir, segments[i]);

String indexUri = String.format(
"%s/%s/%d_index.zip",
FileSystem.newInstance(conf).makeQualified(new Path(config.getStorageDirectory())).toUri().toString(),
DataSegmentPusherUtil.getHdfsStorageDir(segments[i]),
segments[i].getShardSpec().getPartitionNum()
);

Assert.assertEquals(segments[i].getSize(), pushedSegment.getSize());
Assert.assertEquals(segments[i], pushedSegment);
Assert.assertEquals(ImmutableMap.of(
"type",
"hdfs",
"path",
indexUri
), pushedSegment.getLoadSpec());
// rename directory after push
String segmentPath = DataSegmentPusherUtil.getHdfsStorageDir(pushedSegment);

File indexFile = new File(String.format(
"%s/%s/%d_index.zip",
storageDirectory,
segmentPath,
pushedSegment.getShardSpec().getPartitionNum()
));
Assert.assertTrue(indexFile.exists());
File descriptorFile = new File(String.format(
"%s/%s/%d_descriptor.json",
storageDirectory,
segmentPath,
pushedSegment.getShardSpec().getPartitionNum()
));
Assert.assertTrue(descriptorFile.exists());

//read actual data from descriptor file.
DataSegment fromDescriptorFileDataSegment = objectMapper.readValue(descriptorFile, DataSegment.class);

Assert.assertEquals(segments[i].getSize(), pushedSegment.getSize());
Assert.assertEquals(segments[i], pushedSegment);
Assert.assertEquals(ImmutableMap.of(
"type",
"hdfs",
"path",
indexUri
), fromDescriptorFileDataSegment.getLoadSpec());
// rename directory after push
segmentPath = DataSegmentPusherUtil.getHdfsStorageDir(fromDescriptorFileDataSegment);

indexFile = new File(String.format(
"%s/%s/%d_index.zip",
storageDirectory,
segmentPath,
fromDescriptorFileDataSegment.getShardSpec().getPartitionNum()
));
Assert.assertTrue(indexFile.exists());


// push twice will fail and temp dir cleaned
File outDir = new File(String.format("%s/%s", config.getStorageDirectory(), segmentPath));
outDir.setReadOnly();
try {
pusher.push(segmentDir, segments[i]);
}
catch (IOException e) {
Assert.fail("should not throw exception");
}
}
}

public class TestObjectMapper extends ObjectMapper
{
public TestObjectMapper()
{
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
configure(MapperFeature.AUTO_DETECT_GETTERS, false);
configure(MapperFeature.AUTO_DETECT_FIELDS, false);
configure(MapperFeature.AUTO_DETECT_IS_GETTERS, false);
configure(MapperFeature.AUTO_DETECT_SETTERS, false);
configure(SerializationFeature.INDENT_OUTPUT, false);
registerModule(new TestModule().registerSubtypes(new NamedType(NumberedShardSpec.class, "NumberedShardSpec")));
}

public class TestModule extends SimpleModule
{
TestModule()
{
addSerializer(Interval.class, ToStringSerializer.instance);
addSerializer(NumberedShardSpec.class, ToStringSerializer.instance);
addDeserializer(
Interval.class, new StdDeserializer<Interval>(Interval.class)
{
@Override
public Interval deserialize(
JsonParser jsonParser, DeserializationContext deserializationContext
) throws IOException, JsonProcessingException
{
return new Interval(jsonParser.getText());
}
}
);
}
}
}

}

0 comments on commit bebf9f3

Please sign in to comment.