Skip to content

Commit

Permalink
retry loadSegment with all locations (apache#3681)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijianding authored and b-slim committed Dec 6, 2016
1 parent 5440a06 commit f995b14
Show file tree
Hide file tree
Showing 3 changed files with 263 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
package io.druid.segment.loading;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import com.google.inject.Inject;

import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger;
import io.druid.guice.annotations.Json;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
Expand All @@ -34,20 +34,20 @@

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Comparator;
import java.util.SortedSet;

/**
*/
public class SegmentLoaderLocalCacheManager implements SegmentLoader
{
private static final Logger log = new Logger(SegmentLoaderLocalCacheManager.class);
private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class);

private final QueryableIndexFactory factory;
private final SegmentLoaderConfig config;
private final ObjectMapper jsonMapper;

private final List<StorageLocation> locations;
private final SortedSet<StorageLocation> locations;

private final Object lock = new Object();

Expand All @@ -62,7 +62,15 @@ public SegmentLoaderLocalCacheManager(
this.config = config;
this.jsonMapper = mapper;

this.locations = Lists.newArrayList();
this.locations = Sets.newTreeSet(new Comparator<StorageLocation>()
{
@Override
public int compare(StorageLocation left, StorageLocation right)
{
// sorted from empty to full
return Longs.compare(right.available(), left.available());
}
});
for (StorageLocationConfig locationConfig : config.getLocations()) {
locations.add(new StorageLocation(locationConfig.getPath(), locationConfig.getMaxSize()));
}
Expand Down Expand Up @@ -103,61 +111,90 @@ public Segment getSegment(DataSegment segment) throws SegmentLoadingException
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
{
StorageLocation loc = findStorageLocationIfLoaded(segment);

final File retVal;
String storageDir = DataSegmentPusherUtil.getStorageDir(segment);

if (loc == null) {
Iterator<StorageLocation> locIter = locations.iterator();
loc = locIter.next();
while (locIter.hasNext()) {
loc = loc.mostEmpty(locIter.next());
}
loc = loadSegmentWithRetry(segment, storageDir);
}
loc.addSegment(segment);
return new File(loc.getPath(), storageDir);
}

/**
* location may fail because of IO failure, most likely in two cases:<p>
* 1. druid don't have the write access to this location, most likely the administrator doesn't config it correctly<p>
* 2. disk failure, druid can't read/write to this disk anymore
*/
private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException
{
for (StorageLocation loc : locations) {
// locIter is ordered from empty to full
if (!loc.canHandle(segment.getSize())) {
throw new ISE(
"Segment[%s:%,d] too large for storage[%s:%,d].",
segment.getIdentifier(), segment.getSize(), loc.getPath(), loc.available()
);
}
File storageDir = new File(loc.getPath(), storageDirStr);

File storageDir = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
try {
loadInLocationWithStartMarker(segment, storageDir);
return loc;
}
catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segment in current location %s, try next location if any", loc.getPath().getAbsolutePath())
.addData("location", loc.getPath().getAbsolutePath())
.emit();

// We use a marker to prevent the case where a segment is downloaded, but before the download completes,
// the parent directories of the segment are removed
final File downloadStartMarker = new File(storageDir, "downloadStartMarker");
synchronized (lock) {
if (!storageDir.mkdirs()) {
log.debug("Unable to make parent file[%s]", storageDir);
}
try {
if (!downloadStartMarker.createNewFile()) {
throw new SegmentLoadingException("Was not able to create new download marker for [%s]", storageDir);
}
cleanupCacheFiles(loc.getPath(), storageDir);
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Unable to create marker file for [%s]", storageDir);
catch (IOException e1) {
log.error(e1, "Failed to cleanup location " + storageDir.getAbsolutePath());
}
}
}
throw new SegmentLoadingException("Failed to load segment %s in all locations.", segment.getIdentifier());
}

// LoadSpec isn't materialized until here so that any system can interpret Segment without having to have all the LoadSpec dependencies.
final LoadSpec loadSpec = jsonMapper.convertValue(segment.getLoadSpec(), LoadSpec.class);
final LoadSpec.LoadSpecResult result = loadSpec.loadSegment(storageDir);
if(result.getSize() != segment.getSize()){
log.warn("Segment [%s] is different than expected size. Expected [%d] found [%d]", segment.getIdentifier(), segment.getSize(), result.getSize());
private void loadInLocationWithStartMarker(DataSegment segment, File storageDir) throws SegmentLoadingException
{
// We use a marker to prevent the case where a segment is downloaded, but before the download completes,
// the parent directories of the segment are removed
final File downloadStartMarker = new File(storageDir, "downloadStartMarker");
synchronized (lock) {
if (!storageDir.mkdirs()) {
log.debug("Unable to make parent file[%s]", storageDir);
}

if (!downloadStartMarker.delete()) {
throw new SegmentLoadingException("Unable to remove marker file for [%s]", storageDir);
try {
if (!downloadStartMarker.createNewFile()) {
throw new SegmentLoadingException("Was not able to create new download marker for [%s]", storageDir);
}
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Unable to create marker file for [%s]", storageDir);
}

retVal = storageDir;
} else {
retVal = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
}
loadInLocation(segment, storageDir);

loc.addSegment(segment);
if (!downloadStartMarker.delete()) {
throw new SegmentLoadingException("Unable to remove marker file for [%s]", storageDir);
}
}

return retVal;
private void loadInLocation(DataSegment segment, File storageDir) throws SegmentLoadingException
{
// LoadSpec isn't materialized until here so that any system can interpret Segment without having to have all the LoadSpec dependencies.
final LoadSpec loadSpec = jsonMapper.convertValue(segment.getLoadSpec(), LoadSpec.class);
final LoadSpec.LoadSpecResult result = loadSpec.loadSegment(storageDir);
if (result.getSize() != segment.getSize()) {
log.warn(
"Segment [%s] is different than expected size. Expected [%d] found [%d]",
segment.getIdentifier(),
segment.getSize(),
result.getSize()
);
}
}

@Override
Expand All @@ -175,11 +212,19 @@ public void cleanup(DataSegment segment) throws SegmentLoadingException
}

try {
// Druid creates folders of the form dataSource/interval/version/partitionNum.
// We need to clean up all these directories if they are all empty.
File cacheFile = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
cleanupCacheFiles(loc.getPath(), cacheFile);
loc.removeSegment(segment);
// If storageDir.mkdirs() success, but downloadStartMarker.createNewFile() failed,
// in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not.
// So we should always clean all possible locations here
for (StorageLocation location : locations) {
File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
if (localStorageDir.exists()) {
// Druid creates folders of the form dataSource/interval/version/partitionNum.
// We need to clean up all these directories if they are all empty.
File cacheFile = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
cleanupCacheFiles(location.getPath(), cacheFile);
location.removeSegment(segment);
}
}
}
catch (IOException e) {
throw new SegmentLoadingException(e, e.getMessage());
Expand All @@ -202,8 +247,12 @@ public void cleanupCacheFiles(File baseFile, File cacheFile) throws IOException
}
}

if (cacheFile.getParentFile() != null && cacheFile.getParentFile().listFiles().length == 0) {
cleanupCacheFiles(baseFile, cacheFile.getParentFile());
File parent = cacheFile.getParentFile();
if (parent != null) {
File[] children = parent.listFiles();
if (children == null || children.length == 0) {
cleanupCacheFiles(baseFile, parent);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,4 @@ synchronized long available()
{
return maxSize - currSize;
}

StorageLocation mostEmpty(StorageLocation other)
{
return available() > other.available() ? this : other;
}
}
Loading

0 comments on commit f995b14

Please sign in to comment.