Skip to content

Commit

Permalink
remove datasource from hadoop output path (apache#3196)
Browse files Browse the repository at this point in the history
fixes apache#2083, follow-up to apache#1702
  • Loading branch information
xvrl authored and drcrallen committed Jun 29, 2016
1 parent 4c9aeb7 commit 485e381
Show file tree
Hide file tree
Showing 16 changed files with 120 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

public interface DataSegmentPusher
{
public String getPathForHadoop(String dataSource);
public DataSegment push(File file, DataSegment segment) throws IOException;
@Deprecated
String getPathForHadoop(String dataSource);
String getPathForHadoop();
DataSegment push(File file, DataSegment segment) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,15 @@ public AzureDataSegmentPusher(
this.jsonMapper = jsonMapper;
}

@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}

@Override
public String getPathForHadoop()
{
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,24 @@ public CassandraDataSegmentPusher(
}

@Override
public String getPathForHadoop(String dataSource)
public String getPathForHadoop()
{
throw new UnsupportedOperationException("Cassandra storage does not support indexing via Hadoop");
}

@Deprecated
@Override
public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException
{
log.info("Writing [%s] to C*", indexFilesDir);
String key = JOINER.join(
config.getKeyspace().isEmpty() ? null : config.getKeyspace(),
public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}

@Override
public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException
{
log.info("Writing [%s] to C*", indexFilesDir);
String key = JOINER.join(
config.getKeyspace().isEmpty() ? null : config.getKeyspace(),
DataSegmentPusherUtil.getStorageDir(segment)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,18 @@ public CloudFilesDataSegmentPusher(
}

@Override
public String getPathForHadoop(final String dataSource)
public String getPathForHadoop()
{
return null;
}

@Deprecated
@Override
public String getPathForHadoop(final String dataSource)
{
return getPathForHadoop();
}

@Override
public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,15 @@ public HdfsDataSegmentPusher(
log.info("Configured HDFS as deep storage");
}

@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}

@Override
public String getPathForHadoop()
{
return new Path(config.getStorageDirectory()).toUri().toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,17 @@ public S3DataSegmentPusher(
log.info("Configured S3 as deep storage");
}

@Override
public String getPathForHadoop()
{
return String.format("s3n://%s/%s", config.getBucket(), config.getBaseKey());
}

@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return String.format("s3n://%s/%s/%s", config.getBucket(), config.getBaseKey(), dataSource);
return getPathForHadoop();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
new String[]{
toolbox.getObjectMapper().writeValueAsString(spec),
toolbox.getConfig().getHadoopWorkingPath(),
toolbox.getSegmentPusher().getPathForHadoop(getDataSource())
toolbox.getSegmentPusher().getPathForHadoop()
},
loader
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,17 @@ private DataSegment generateSegment(
final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<DataSegment>();
final DataSegmentPusher wrappedDataSegmentPusher = new DataSegmentPusher()
{
@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return toolbox.getSegmentPusher().getPathForHadoop(dataSource);
return getPathForHadoop();
}

@Override
public String getPathForHadoop()
{
return toolbox.getSegmentPusher().getPathForHadoop();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,15 @@ public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOExcepti
}
}, null, new DataSegmentPusher()
{
@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}

@Override
public String getPathForHadoop()
{
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,15 @@ public void deleteSegments(Set<DataSegment> segments)
newMockEmitter(),
new DataSegmentPusher()
{
@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}

@Override
public String getPathForHadoop()
{
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,11 +456,18 @@ private DataSegmentPusher setUpDataSegmentPusher()
return new DataSegmentPusher()
{
@Override
public String getPathForHadoop(String dataSource)
public String getPathForHadoop()
{
throw new UnsupportedOperationException();
}

@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}

@Override
public DataSegment push(File file, DataSegment segment) throws IOException
{
Expand Down Expand Up @@ -993,8 +1000,15 @@ public void testRealtimeIndexTaskFailure() throws Exception
{
dataSegmentPusher = new DataSegmentPusher()
{
@Deprecated
@Override
public String getPathForHadoop(String s)
{
return getPathForHadoop();
}

@Override
public String getPathForHadoop()
{
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,15 @@ public class TestDataSegmentPusher implements DataSegmentPusher
{
private final Set<DataSegment> pushedSegments = Sets.newConcurrentHashSet();

@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}

@Override
public String getPathForHadoop()
{
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,17 @@ public LocalDataSegmentPusher(
log.info("Configured local filesystem as deep storage");
}

@Override
public String getPathForHadoop()
{
return config.getStorageDirectory().getAbsoluteFile().toURI().toString();
}

@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return new File(config.getStorageDirectory().getAbsoluteFile(), dataSource).toURI().toString();
return getPathForHadoop();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ public void testPathForHadoopAbsolute()
config.storageDirectory = new File("/druid");

Assert.assertEquals(
"file:/druid/foo",
new LocalDataSegmentPusher(config, new ObjectMapper()).getPathForHadoop("foo")
"file:/druid",
new LocalDataSegmentPusher(config, new ObjectMapper()).getPathForHadoop()
);
}

Expand All @@ -131,8 +131,8 @@ public void testPathForHadoopRelative()
config.storageDirectory = new File("druid");

Assert.assertEquals(
String.format("file:%s/druid/foo", System.getProperty("user.dir")),
new LocalDataSegmentPusher(config, new ObjectMapper()).getPathForHadoop("foo")
String.format("file:%s/druid", System.getProperty("user.dir")),
new LocalDataSegmentPusher(config, new ObjectMapper()).getPathForHadoop()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,15 @@ public int columnCacheSizeBytes()
EmittingLogger.registerEmitter(emitter);
dataSegmentPusher = new DataSegmentPusher()
{
@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}

@Override
public String getPathForHadoop()
{
throw new UnsupportedOperationException();
}
Expand Down
10 changes: 9 additions & 1 deletion services/src/main/java/io/druid/cli/CliRealtimeExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,18 @@ public Iterable<DruidServer> getInventory()

private static class NoopDataSegmentPusher implements DataSegmentPusher
{

@Override
public String getPathForHadoop()
{
return "noop";
}

@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return dataSource;
return getPathForHadoop();
}

@Override
Expand Down

0 comments on commit 485e381

Please sign in to comment.