Skip to content

Commit

Permalink
Fix bug with SegmentLoaderLocalCacheManager (apache#3929)
Browse files Browse the repository at this point in the history
* Fix bug with SegmentLoaderLocalCacheManager

* Use collections.sort
  • Loading branch information
niketh authored and gianm committed Feb 23, 2017
1 parent 2ead572 commit 02fc625
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package io.druid.segment.loading;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
import com.google.inject.Inject;
import com.metamx.common.ISE;
Expand All @@ -33,8 +33,10 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.SortedSet;
import java.util.List;

/**
*/
Expand All @@ -46,10 +48,18 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
private final SegmentLoaderConfig config;
private final ObjectMapper jsonMapper;

private final SortedSet<StorageLocation> locations;
private final List<StorageLocation> locations;

private final Object lock = new Object();

private static final Comparator<StorageLocation> COMPARATOR = new Comparator<StorageLocation>()
{
@Override public int compare(StorageLocation left, StorageLocation right)
{
return Longs.compare(right.available(), left.available());
}
};

@Inject
public SegmentLoaderLocalCacheManager(
IndexIO indexIO,
Expand All @@ -61,15 +71,7 @@ public SegmentLoaderLocalCacheManager(
this.config = config;
this.jsonMapper = mapper;

this.locations = Sets.newTreeSet(new Comparator<StorageLocation>()
{
@Override
public int compare(StorageLocation left, StorageLocation right)
{
// sorted from empty to full
return Longs.compare(right.available(), left.available());
}
});
this.locations = Lists.newArrayList();
for (StorageLocationConfig locationConfig : config.getLocations()) {
locations.add(new StorageLocation(locationConfig.getPath(), locationConfig.getMaxSize()));
}
Expand All @@ -88,7 +90,7 @@ public boolean isSegmentLoaded(final DataSegment segment)

public StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
{
for (StorageLocation location : locations) {
for (StorageLocation location : getSortedList(locations)) {
File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
if (localStorageDir.exists()) {
return location;
Expand Down Expand Up @@ -138,7 +140,7 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
*/
private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException
{
for (StorageLocation loc : locations) {
for (StorageLocation loc : getSortedList(locations)) {
// locIter is ordered from empty to full
if (!loc.canHandle(segment.getSize())) {
throw new ISE(
Expand Down Expand Up @@ -230,7 +232,7 @@ public void cleanup(DataSegment segment) throws SegmentLoadingException
// If storageDir.mkdirs() success, but downloadStartMarker.createNewFile() failed,
// in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not.
// So we should always clean all possible locations here
for (StorageLocation location : locations) {
for (StorageLocation location : getSortedList(locations)) {
File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
if (localStorageDir.exists()) {
// Druid creates folders of the form dataSource/interval/version/partitionNum.
Expand Down Expand Up @@ -270,4 +272,12 @@ public void cleanupCacheFiles(File baseFile, File cacheFile) throws IOException
}
}
}

public List<StorageLocation> getSortedList(List<StorageLocation> locs)
{
List<StorageLocation> locations = new ArrayList<>(locs);
Collections.sort(locations, COMPARATOR);

return locations;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,83 @@ public void testRetryAllFail() throws Exception
manager.cleanup(segmentToDownload);
}

@Test
public void testEmptyToFullOrder() throws Exception
{
final List<StorageLocationConfig> locations = Lists.newArrayList();
final StorageLocationConfig locationConfig = new StorageLocationConfig();
final File localStorageFolder = tmpFolder.newFolder("local_storage_folder");
localStorageFolder.setWritable(true);
locationConfig.setPath(localStorageFolder);
locationConfig.setMaxSize(10L);
locations.add(locationConfig);
final StorageLocationConfig locationConfig2 = new StorageLocationConfig();
final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder2");
localStorageFolder2.setWritable(true);
locationConfig2.setPath(localStorageFolder2);
locationConfig2.setMaxSize(10L);
locations.add(locationConfig2);

manager = new SegmentLoaderLocalCacheManager(
TestHelper.getTestIndexIO(),
new SegmentLoaderConfig().withLocations(locations),
jsonMapper
);
final File segmentSrcFolder = tmpFolder.newFolder("segmentSrcFolder");
final DataSegment segmentToDownload = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withLoadSpec(
ImmutableMap.<String, Object>of(
"type",
"local",
"path",
segmentSrcFolder.getCanonicalPath()
+ "/test_segment_loader"
+ "/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z"
+ "/0/index.zip"
)
);
// manually create a local segment under segmentSrcFolder
final File localSegmentFile = new File(
segmentSrcFolder,
"test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"
);
localSegmentFile.mkdirs();
final File indexZip = new File(localSegmentFile, "index.zip");
indexZip.createNewFile();

Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentLoaded(segmentToDownload));

File segmentFile = manager.getSegmentFiles(segmentToDownload);
Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder/"));
Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload));

final DataSegment segmentToDownload2 = dataSegmentWithInterval("2014-11-20T00:00:00Z/P1D").withLoadSpec(
ImmutableMap.<String, Object>of(
"type",
"local",
"path",
segmentSrcFolder.getCanonicalPath()
+ "/test_segment_loader"
+ "/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z"
+ "/0/index.zip"
)
);
// manually create a local segment under segmentSrcFolder
final File localSegmentFile2 = new File(
segmentSrcFolder,
"test_segment_loader/2014-11-20T00:00:00.000Z_2014-11-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"
);
localSegmentFile2.mkdirs();
final File indexZip2 = new File(localSegmentFile2, "index.zip");
indexZip2.createNewFile();

File segmentFile2 = manager.getSegmentFiles(segmentToDownload2);
Assert.assertTrue(segmentFile2.getAbsolutePath().contains("/local_storage_folder2/"));
Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentLoaded(segmentToDownload2));

manager.cleanup(segmentToDownload2);
Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentLoaded(segmentToDownload2));
}

private DataSegment dataSegmentWithInterval(String intervalStr)
{
return DataSegment.builder()
Expand All @@ -312,7 +389,7 @@ private DataSegment dataSegmentWithInterval(String intervalStr)
.metrics(ImmutableList.<String>of())
.shardSpec(NoneShardSpec.instance())
.binaryVersion(9)
.size(0)
.size(10L)
.build();
}
}

0 comments on commit 02fc625

Please sign in to comment.