Skip to content

Commit

Permalink
TmpFileIOPeons to create files under the merging output directory, in…
Browse files Browse the repository at this point in the history
…stead of java.io.tmpdir (apache#3990)

* In IndexMerger and IndexMergerV9, create temporary files under the output directory/tmpPeonFiles, instead of java.io.tmpdir

* Use FileUtils.forceMkdir() across the codebase and remove some unused code

* Fix test

* Fix PullDependencies.run()

* Unused import
  • Loading branch information
leventov authored and gianm committed Mar 2, 2017
1 parent ea1f5b7 commit 81a5f98
Show file tree
Hide file tree
Showing 16 changed files with 87 additions and 243 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@

package io.druid.storage.azure;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;

import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.MapUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.loading.DataSegmentPuller;
Expand Down Expand Up @@ -56,9 +55,8 @@ public io.druid.java.util.common.FileUtils.FileCopyResult getSegmentFiles(
)
throws SegmentLoadingException
{
prepareOutDir(outDir);

try {
prepareOutDir(outDir);

final ByteSource byteSource = new AzureByteSource(azureStorage, containerName, blobPath);
final io.druid.java.util.common.FileUtils.FileCopyResult result = CompressionUtils.unzip(
Expand Down Expand Up @@ -99,15 +97,9 @@ public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoad
getSegmentFiles(containerName, blobPath, outDir);
}

public void prepareOutDir(final File outDir) throws ISE
@VisibleForTesting
void prepareOutDir(final File outDir) throws IOException
{
if (!outDir.exists()) {
outDir.mkdirs();
}

if (!outDir.isDirectory()) {
throw new ISE("[%s] must be a directory.", outDir);
}

FileUtils.forceMkdir(outDir);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

package io.druid.storage.google;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.inject.Inject;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.FileUtils;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.MapUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.loading.DataSegmentPuller;
Expand Down Expand Up @@ -64,9 +64,9 @@ public FileUtils.FileCopyResult getSegmentFiles(final String bucket, final Strin
{
LOG.info("Pulling index at path[%s] to outDir[%s]", bucket, path, outDir.getAbsolutePath());

prepareOutDir(outDir);

try {
prepareOutDir(outDir);

final GoogleByteSource byteSource = new GoogleByteSource(storage, bucket, path);
final FileUtils.FileCopyResult result = CompressionUtils.unzip(
byteSource,
Expand All @@ -91,16 +91,10 @@ public FileUtils.FileCopyResult getSegmentFiles(final String bucket, final Strin
}
}

// Needs to be public for the tests.
public void prepareOutDir(final File outDir) throws ISE
@VisibleForTesting
void prepareOutDir(final File outDir) throws IOException
{
if (!outDir.exists()) {
outDir.mkdirs();
}

if (!outDir.isDirectory()) {
throw new ISE("outDir[%s] must be a directory.", outDir);
}
org.apache.commons.io.FileUtils.forceMkdir(outDir);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@
import com.google.common.io.ByteSource;
import com.google.common.io.Files;
import com.google.inject.Inject;

import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.FileUtils;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.MapUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.UOE;
Expand All @@ -38,6 +36,12 @@
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.URIDataPuller;
import io.druid.timeline.DataSegment;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.StorageObject;

import javax.tools.FileObject;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -47,11 +51,6 @@
import java.net.URI;
import java.util.Map;
import java.util.concurrent.Callable;
import javax.tools.FileObject;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.StorageObject;

/**
* A data segment puller that also hanldes URI data pulls.
Expand Down Expand Up @@ -176,15 +175,9 @@ public FileUtils.FileCopyResult getSegmentFiles(final S3Coords s3Coords, final F
throw new SegmentLoadingException("IndexFile[%s] does not exist.", s3Coords);
}

if (!outDir.exists()) {
outDir.mkdirs();
}

if (!outDir.isDirectory()) {
throw new ISE("outDir[%s] must be a directory.", outDir);
}

try {
org.apache.commons.io.FileUtils.forceMkdir(outDir);

final URI uri = URI.create(String.format("s3://%s/%s", s3Coords.bucket, s3Coords.path));
final ByteSource byteSource = new ByteSource()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,7 @@ protected void map(
context.setStatus("CONVERTING");
context.progress();
final File outDir = new File(tmpDir, "out");
if (!outDir.mkdir() && (!outDir.exists() || !outDir.isDirectory())) {
throw new IOException(String.format("Could not create output directory [%s]", outDir));
}
FileUtils.forceMkdir(outDir);
HadoopDruidConverterConfig.INDEX_MERGER.convert(
inDir,
outDir,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,75 +25,15 @@
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException;

/**
*/
public class
StreamUtils
public class StreamUtils
{
// The default buffer size to use (from IOUtils)
private static final int DEFAULT_BUFFER_SIZE = 1024 * 4;

/**
* Copy from an input stream to a file (and buffer it) and close the input stream.
* <p/>
* It is highly recommended to use FileUtils.retryCopy whenever possible, and not use a raw `InputStream`
*
* @param is The input stream to copy bytes from. `is` is closed regardless of the copy result.
* @param file The file to copy bytes to. Any parent directories are automatically created.
*
* @return The count of bytes written to the file
*
* @throws IOException
*/
public static long copyToFileAndClose(InputStream is, File file) throws IOException
{
file.getParentFile().mkdirs();
try (OutputStream os = new BufferedOutputStream(new FileOutputStream(file))) {
final long result = ByteStreams.copy(is, os);
// Workarround for http://hg.openjdk.java.net/jdk8/jdk8/jdk/rev/759aa847dcaf
os.flush();
return result;
}
finally {
is.close();
}
}

/**
* Copy bytes from `is` to `file` but timeout if the copy takes too long. The timeout is best effort and not
* guaranteed. Specifically, `is.read` will not be interrupted.
*
* @param is The `InputStream` to copy bytes from. It is closed regardless of copy results.
* @param file The `File` to copy bytes to
* @param timeout The timeout (in ms) of the copy.
*
* @return The size of bytes written to `file`
*
* @throws IOException
* @throws TimeoutException If `timeout` is exceeded
*/
public static long copyToFileAndClose(InputStream is, File file, long timeout) throws IOException, TimeoutException
{
file.getParentFile().mkdirs();
try (OutputStream os = new BufferedOutputStream(new FileOutputStream(file))) {
final long retval = copyWithTimeout(is, os, timeout);
// Workarround for http://hg.openjdk.java.net/jdk8/jdk8/jdk/rev/759aa847dcaf
os.flush();
return retval;
}
finally {
is.close();
}
}

/**
* Copy from `is` to `os` and close the streams regardless of the result.
Expand All @@ -119,35 +59,6 @@ public static long copyAndClose(InputStream is, OutputStream os) throws IOExcept
}
}

/**
* Copy from the input stream to the output stream and tries to exit if the copy exceeds the timeout. The timeout
* is best effort. Specifically, `is.read` will not be interrupted.
*
* @param is The input stream to read bytes from.
* @param os The output stream to write bytes to.
* @param timeout The timeout (in ms) for the copy operation
*
* @return The total size of bytes written to `os`
*
* @throws IOException
* @throws TimeoutException If `tiemout` is exceeded
*/
public static long copyWithTimeout(InputStream is, OutputStream os, long timeout) throws IOException, TimeoutException
{
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
int n;
long startTime = System.currentTimeMillis();
long size = 0;
while (-1 != (n = is.read(buffer))) {
if (System.currentTimeMillis() - startTime > timeout) {
throw new TimeoutException(String.format("Copy time has exceeded %,d millis", timeout));
}
os.write(buffer, 0, n);
size += n;
}
return size;
}

/**
* Retry copy attempts from input stream to output stream. Does *not* check to make sure data was intact during the transfer
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.Sets;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import org.apache.commons.io.FileUtils;

import java.io.Closeable;
import java.io.File;
Expand Down Expand Up @@ -78,9 +79,7 @@ public LimitedOutputStream createFile() throws IOException
throw new ISE("Closed");
}

if (!storageDirectory.exists() && !storageDirectory.mkdir()) {
throw new IOException(String.format("Cannot create storageDirectory: %s", storageDirectory));
}
FileUtils.forceMkdir(storageDirectory);

final File theFile = new File(storageDirectory, String.format("%08d.tmp", files.size()));
final EnumSet<StandardOpenOption> openOptions = EnumSet.of(
Expand Down
3 changes: 2 additions & 1 deletion processing/src/main/java/io/druid/segment/IndexIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import io.druid.segment.serde.LongGenericColumnPartSerde;
import io.druid.segment.serde.LongGenericColumnSupplier;
import io.druid.segment.serde.SpatialIndexColumnPartSupplier;
import org.apache.commons.io.FileUtils;
import org.joda.time.Interval;

import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -502,7 +503,7 @@ public void convertV8toV9(File v8Dir, File v9Dir, IndexSpec indexSpec)
try {
SmooshedFileMapper v8SmooshedFiles = closer.register(Smoosh.map(v8Dir));

v9Dir.mkdirs();
FileUtils.forceMkdir(v9Dir);
final FileSmoosher v9Smoosher = closer.register(new FileSmoosher(v9Dir));

ByteStreams.write(Ints.toByteArray(9), Files.newOutputStreamSupplier(new File(v9Dir, "version.bin")));
Expand Down
Loading

0 comments on commit 81a5f98

Please sign in to comment.