Skip to content

Commit

Permalink
reset-cluster command to clean up druid state stored on metadata and …
Browse files Browse the repository at this point in the history
…deep storage (apache#3670)
  • Loading branch information
himanshug authored and pjain1 committed Nov 9, 2016
1 parent 575aeb8 commit b76b3f8
Show file tree
Hide file tree
Showing 28 changed files with 509 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@

import io.druid.timeline.DataSegment;

import java.io.IOException;

/**
*/
public interface DataSegmentKiller
{
public void kill(DataSegment segments) throws SegmentLoadingException;
void kill(DataSegment segments) throws SegmentLoadingException;
void killAll() throws IOException;

}
6 changes: 6 additions & 0 deletions api/src/main/java/io/druid/tasklogs/NoopTaskLogs.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,10 @@ public void pushTaskLog(String taskid, File logFile) throws IOException
{
log.info("Not pushing logs for task: %s", taskid);
}

@Override
public void killAll() throws IOException
{
log.info("Noop: No task logs are deleted.");
}
}
29 changes: 29 additions & 0 deletions api/src/main/java/io/druid/tasklogs/TaskLogKiller.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.tasklogs;

import java.io.IOException;

/**
*/
public interface TaskLogKiller
{
void killAll() throws IOException;
}
2 changes: 1 addition & 1 deletion api/src/main/java/io/druid/tasklogs/TaskLogs.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@

package io.druid.tasklogs;

public interface TaskLogs extends TaskLogStreamer, TaskLogPusher
public interface TaskLogs extends TaskLogStreamer, TaskLogPusher, TaskLogKiller
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,6 @@ byte[] lookup(
void createAuditTable();

void createSupervisorsTable();

void deleteAllRecords(String tableName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,19 @@ public String getSupervisorTable()
{
return supervisorTable;
}

public String getTasksTable()
{
return tasksTable;
}

public String getTaskLogTable()
{
return taskLogTable;
}

public String getTaskLockTable()
{
return taskLockTable;
}
}
55 changes: 55 additions & 0 deletions docs/content/operations/reset-cluster.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
---
layout: doc_page
---
# ResetCluster tool

ResetCluster tool can be used to completely wipe out Druid cluster state stored on Metadata and Deep storage. This is
intended to be used in dev/test environments where you typically want to reset the cluster before running
the test suite.
ResetCluster automatically figures out necessary information from Druid cluster configuration. So the java classpath
used in the command must have all the necessary druid configuration files.

It can be run in one of the following ways.

```
java io.druid.cli.Main tools reset-cluster [--metadataStore] [--segmentFiles] [--taskLogs] [--hadoopWorkingPath]
```

or

```
java io.druid.cli.Main tools reset-cluster --all
```

# Further Description
Usage documentation can be printed by running following command.

```
java io.druid.cli.Main help tools reset-cluster
```

```
NAME
druid tools reset-cluster - Cleanup all persisted state from metadata
and deep storage.
SYNOPSIS
druid tools reset-cluster [--all] [--hadoopWorkingPath]
[--metadataStore] [--segmentFiles] [--taskLogs]
OPTIONS
--all
delete all state stored in metadata and deep storage
--hadoopWorkingPath
delete hadoopWorkingPath
--metadataStore
delete all records in metadata storage
--segmentFiles
delete all segment files from deep storage
--taskLogs
delete all tasklogs
```
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;

import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.Map;
Expand Down Expand Up @@ -67,4 +68,10 @@ public void kill(DataSegment segment) throws SegmentLoadingException
}
}

@Override
public void killAll() throws IOException
{
throw new UnsupportedOperationException("not implemented");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,10 @@ public InputStream openStream() throws IOException {
private String getTaskLogKey(String taskid) {
return String.format("%s/%s/log", config.getPrefix(), taskid);
}

@Override
public void killAll() throws IOException
{
throw new UnsupportedOperationException("not implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@ public class HdfsDataSegmentKiller implements DataSegmentKiller

private final Configuration config;

private final Path storageDirectory;

@Inject
public HdfsDataSegmentKiller(final Configuration config)
public HdfsDataSegmentKiller(final Configuration config, final HdfsDataSegmentPusherConfig pusherConfig)
{
this.config = config;
this.storageDirectory = new Path(pusherConfig.getStorageDirectory());
}

@Override
Expand Down Expand Up @@ -88,6 +91,14 @@ public void kill(DataSegment segment) throws SegmentLoadingException
}
}

@Override
public void killAll() throws IOException
{
log.info("Deleting all segment files from hdfs dir [%s].", storageDirectory.toUri().toString());
final FileSystem fs = storageDirectory.getFileSystem(config);
fs.delete(storageDirectory, true);
}

private boolean safeNonRecursiveDelete(FileSystem fs, Path path)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.inject.Inject;

import io.druid.java.util.common.logger.Logger;
import io.druid.tasklogs.TaskLogs;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -115,6 +114,15 @@ private static String mergePaths(String path1, String path2)
{
return path1 + (path1.endsWith(Path.SEPARATOR) ? "" : Path.SEPARATOR) + path2;
}

@Override
public void killAll() throws IOException
{
log.info("Deleting all task logs from hdfs dir [%s].", config.getDirectory());
Path taskLogDir = new Path(config.getDirectory());
FileSystem fs = taskLogDir.getFileSystem(hadoopConfig);
fs.delete(taskLogDir, true);
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,17 @@ public class HdfsDataSegmentKillerTest
public void testKill() throws Exception
{
Configuration config = new Configuration();
HdfsDataSegmentKiller killer = new HdfsDataSegmentKiller(config);
HdfsDataSegmentKiller killer = new HdfsDataSegmentKiller(
config,
new HdfsDataSegmentPusherConfig()
{
@Override
public String getStorageDirectory()
{
return "/tmp";
}
}
);

FileSystem fs = FileSystem.get(config);

Expand Down Expand Up @@ -99,7 +109,17 @@ public void testKill() throws Exception
public void testKillNonExistingSegment() throws Exception
{
Configuration config = new Configuration();
HdfsDataSegmentKiller killer = new HdfsDataSegmentKiller(config);
HdfsDataSegmentKiller killer = new HdfsDataSegmentKiller(
config,
new HdfsDataSegmentPusherConfig()
{
@Override
public String getStorageDirectory()
{
return "/tmp";
}
}
);
killer.kill(getSegmentWithPath(new Path("/xxx/", "index.zip").toString()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package io.druid.storage.s3;

import com.google.inject.Inject;

import io.druid.java.util.common.MapUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.loading.DataSegmentKiller;
Expand All @@ -29,6 +28,7 @@
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;

import java.io.IOException;
import java.util.Map;

/**
Expand Down Expand Up @@ -69,4 +69,10 @@ public void kill(DataSegment segment) throws SegmentLoadingException
throw new SegmentLoadingException(e, "Couldn't kill segment[%s]: [%s]", segment.getIdentifier(), e);
}
}

@Override
public void killAll() throws IOException
{
throw new UnsupportedOperationException("not implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,10 @@ private String getTaskLogKey(String taskid)
{
return String.format("%s/%s/log", config.getS3Prefix(), taskid);
}

@Override
public void killAll() throws IOException
{
throw new UnsupportedOperationException("not implemented");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.indexer;

import io.druid.java.util.common.logger.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/**
* Used by ResetCluster to delete the Hadoop Working Path.
*/
public class HadoopWorkingDirCleaner
{
private static final Logger log = new Logger(HadoopWorkingDirCleaner.class);

public static String runTask(String[] args) throws Exception
{
String workingPath = args[0];
log.info("Deleting indexing hadoop working path [%s].", workingPath);
Path p = new Path(workingPath);
FileSystem fs = p.getFileSystem(new Configuration());
fs.delete(p, true);

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.druid.indexing.common.config.FileTaskLogsConfig;
import io.druid.indexing.common.tasklogs.FileTaskLogs;
import io.druid.tasklogs.NoopTaskLogs;
import io.druid.tasklogs.TaskLogKiller;
import io.druid.tasklogs.TaskLogPusher;
import io.druid.tasklogs.TaskLogs;

Expand All @@ -46,5 +47,6 @@ public void configure(Binder binder)
binder.bind(FileTaskLogs.class).in(LazySingleton.class);

binder.bind(TaskLogPusher.class).to(TaskLogs.class);
binder.bind(TaskLogKiller.class).to(TaskLogs.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,16 @@ public boolean apply(@Nullable URL input)
* @throws MalformedURLException from Initialization.getClassLoaderForExtension
*/
protected ClassLoader buildClassLoader(final TaskToolbox toolbox) throws MalformedURLException
{
return buildClassLoader(hadoopDependencyCoordinates, toolbox.getConfig().getDefaultHadoopCoordinates());
}

public static ClassLoader buildClassLoader(final List<String> hadoopDependencyCoordinates,
final List<String> defaultHadoopCoordinates) throws MalformedURLException
{
final List<String> finalHadoopDependencyCoordinates = hadoopDependencyCoordinates != null
? hadoopDependencyCoordinates
: toolbox.getConfig().getDefaultHadoopCoordinates();
: defaultHadoopCoordinates;

final List<URL> jobURLs = Lists.newArrayList(
Arrays.asList(((URLClassLoader) HadoopIndexTask.class.getClassLoader()).getURLs())
Expand Down
Loading

0 comments on commit b76b3f8

Please sign in to comment.