Skip to content

Commit

Permalink
Merge pull request apache#125 from metamx/indexing-service-stuff
Browse files Browse the repository at this point in the history
Indexing service stuff
  • Loading branch information
cheddar committed Apr 20, 2013
2 parents 4c1cbdc + 4f4ebd5 commit 42c3f27
Show file tree
Hide file tree
Showing 13 changed files with 236 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ public static ServiceDiscovery makeServiceDiscoveryClient(
final ServiceInstance serviceInstance =
ServiceInstance.builder()
.name(config.getServiceName().replace('/', ':'))
.address(addressFromHost(config.getHost()))
.port(config.getPort())
.build();
final ServiceDiscovery serviceDiscovery =
Expand Down Expand Up @@ -361,6 +362,16 @@ public static String makePropPath(String basePath)
return String.format("%s/%s", basePath, PROP_SUBPATH);
}

public static String addressFromHost(final String host)
{
final int colon = host.indexOf(':');
if (colon < 0) {
return host;
} else {
return host.substring(0, colon);
}
}

/**
* Validate and Resolve Properties.
* Resolve zpaths with props like druid.zk.paths.*Path using druid.zk.paths.base value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ public abstract class ServiceDiscoveryConfig extends CuratorConfig
@Config("druid.service")
public abstract String getServiceName();

@Config("druid.host")
public abstract String getHost();

@Config("druid.port")
public abstract int getPort();

Expand Down
61 changes: 61 additions & 0 deletions common/src/main/java/com/metamx/druid/db/DbConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,62 @@ public static void createConfigTable(final DBI dbi, final String configTableName
);
}

public static void createTaskTable(final DBI dbi, final String taskTableName)
{
createTable(
dbi,
taskTableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` varchar(255) NOT NULL,\n"
+ " `created_date` tinytext NOT NULL,\n"
+ " `datasource` varchar(255) NOT NULL,\n"
+ " `payload` longblob NOT NULL,\n"
+ " `status_payload` longblob NOT NULL,\n"
+ " `active` tinyint(1) NOT NULL DEFAULT '0',\n"
+ " PRIMARY KEY (`id`)\n"
+ ")",
taskTableName
)
);
}

public static void createTaskLogTable(final DBI dbi, final String taskLogsTableName)
{
createTable(
dbi,
taskLogsTableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `log_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
taskLogsTableName
)
);
}

public static void createTaskLockTable(final DBI dbi, final String taskLocksTableName)
{
createTable(
dbi,
taskLocksTableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `lock_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
taskLocksTableName
)
);
}

public static void createTable(
final DBI dbi,
final String tableName,
Expand Down Expand Up @@ -125,6 +181,11 @@ private DataSource getDatasource()
dataSource.setPassword(config.getDatabasePassword());
dataSource.setUrl(config.getDatabaseConnectURI());

if (config.isValidationQuery()) {
dataSource.setValidationQuery(config.getValidationQuery());
dataSource.setTestOnBorrow(true);
}

return dataSource;
}
}
12 changes: 12 additions & 0 deletions common/src/main/java/com/metamx/druid/db/DbConnectorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,16 @@ public abstract class DbConnectorConfig
@JsonProperty("segmentTable")
@Config("druid.database.segmentTable")
public abstract String getSegmentTable();

@JsonProperty("validationQuery")
@Config("druid.database.validation")
public boolean isValidationQuery() {
return false;
}

@JsonProperty("validationQuery")
@Config("druid.database.validationQuery")
public String getValidationQuery() {
return "SELECT 1";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.realtime.FireDepartmentConfig;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.Firehose;
import com.metamx.druid.realtime.FirehoseFactory;
import com.metamx.druid.realtime.GracefulShutdownFirehose;
import com.metamx.druid.realtime.MinTimeFirehose;
import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.RealtimePlumberSchool;
import com.metamx.druid.realtime.Schema;
Expand All @@ -62,19 +64,22 @@ public class RealtimeIndexTask extends AbstractTask
private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class);

@JsonIgnore
final Schema schema;
private final Schema schema;

@JsonIgnore
final FirehoseFactory firehoseFactory;
private final FirehoseFactory firehoseFactory;

@JsonIgnore
final FireDepartmentConfig fireDepartmentConfig;
private final FireDepartmentConfig fireDepartmentConfig;

@JsonIgnore
final Period windowPeriod;
private final Period windowPeriod;

@JsonIgnore
final IndexGranularity segmentGranularity;
private final IndexGranularity segmentGranularity;

@JsonIgnore
private final DateTime minTime;

@JsonIgnore
private volatile Plumber plumber = null;
Expand All @@ -95,7 +100,8 @@ public RealtimeIndexTask(
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig, // TODO rename?
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity,
@JsonProperty("minTime") DateTime minTime
)
{
super(
Expand All @@ -116,6 +122,7 @@ public RealtimeIndexTask(
this.fireDepartmentConfig = fireDepartmentConfig;
this.windowPeriod = windowPeriod;
this.segmentGranularity = segmentGranularity;
this.minTime = minTime;
}

@Override
Expand Down Expand Up @@ -156,7 +163,19 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception
if (shutdown) {
return TaskStatus.success(getId());
}
firehose = new GracefulShutdownFirehose(firehoseFactory.connect(), segmentGranularity, windowPeriod);

Firehose wrappedFirehose = firehoseFactory.connect();
if (minTime != null) {
log.info("Wrapping firehose in MinTimeFirehose with minTime[%s]", minTime);
wrappedFirehose = new MinTimeFirehose(wrappedFirehose, minTime);
}

log.info(
"Wrapping firehose in GracefulShutdownFirehose with segmentGranularity[%s] and windowPeriod[%s]",
segmentGranularity,
windowPeriod
);
firehose = new GracefulShutdownFirehose(wrappedFirehose, segmentGranularity, windowPeriod);
}

// TODO -- Take PlumberSchool in constructor (although that will need jackson injectables for stuff like
Expand Down Expand Up @@ -347,6 +366,12 @@ public IndexGranularity getSegmentGranularity()
return segmentGranularity;
}

@JsonProperty
public DateTime getMinTime()
{
return minTime;
}

public static class TaskActionSegmentPublisher implements SegmentPublisher
{
final Task task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,16 @@ public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
String.format(
"INSERT INTO %s (id, created_date, payload, status_code, status_payload) VALUES (:id, :created_date, :payload, :status_code, :status_payload)",
"INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)",
dbConnectorConfig.getTaskTable()
)
)
.bind("id", task.getId())
.bind("created_date", new DateTime().toString())
.bind("payload", jsonMapper.writeValueAsString(task))
.bind("status_code", status.getStatusCode().toString())
.bind("status_payload", jsonMapper.writeValueAsString(status))
.bind("datasource", task.getDataSource())
.bind("payload", jsonMapper.writeValueAsBytes(task))
.bind("active", status.isRunnable() ? 1 : 0)
.bind("status_payload", jsonMapper.writeValueAsBytes(status))
.execute();

return null;
Expand Down Expand Up @@ -122,21 +123,20 @@ public Integer withHandle(Handle handle) throws Exception
{
return handle.createStatement(
String.format(
"UPDATE %s SET status_code = :status_code, status_payload = :status_payload WHERE id = :id AND status_code = :old_status_code",
"UPDATE %s SET active = :active, status_payload = :status_payload WHERE id = :id AND active = 1",
dbConnectorConfig.getTaskTable()
)
)
.bind("id", status.getId())
.bind("status_code", status.getStatusCode().toString())
.bind("old_status_code", TaskStatus.Status.RUNNING.toString())
.bind("status_payload", jsonMapper.writeValueAsString(status))
.bind("active", status.isRunnable() ? 1 : 0)
.bind("status_payload", jsonMapper.writeValueAsBytes(status))
.execute();
}
}
);

if(updated != 1) {
throw new IllegalStateException(String.format("Running task not found: %s", status.getId()));
throw new IllegalStateException(String.format("Active task not found: %s", status.getId()));
}
}

Expand All @@ -163,7 +163,7 @@ public Optional<Task> withHandle(Handle handle) throws Exception
return Optional.absent();
} else {
final Map<String, Object> dbStatus = Iterables.getOnlyElement(dbTasks);
return Optional.of(jsonMapper.readValue(dbStatus.get("payload").toString(), Task.class));
return Optional.of(jsonMapper.readValue((byte[])dbStatus.get("payload"), Task.class));
}
}
}
Expand Down Expand Up @@ -193,42 +193,48 @@ public Optional<TaskStatus> withHandle(Handle handle) throws Exception
return Optional.absent();
} else {
final Map<String, Object> dbStatus = Iterables.getOnlyElement(dbStatuses);
return Optional.of(jsonMapper.readValue(dbStatus.get("status_payload").toString(), TaskStatus.class));
return Optional.of(jsonMapper.readValue((byte[])dbStatus.get("status_payload"), TaskStatus.class));
}
}
}
);
}

@Override
public List<String> getRunningTaskIds()
public List<Task> getRunningTasks()
{
return dbi.withHandle(
new HandleCallback<List<String>>()
new HandleCallback<List<Task>>()
{
@Override
public List<String> withHandle(Handle handle) throws Exception
public List<Task> withHandle(Handle handle) throws Exception
{
final List<Map<String, Object>> dbTasks =
handle.createQuery(
String.format(
"SELECT id FROM %s WHERE status_code = :status_code",
"SELECT id, payload, status_payload FROM %s WHERE active = 1",
dbConnectorConfig.getTaskTable()
)
)
.bind("status_code", TaskStatus.Status.RUNNING.toString())
.list();

return Lists.transform(
dbTasks, new Function<Map<String, Object>, String>()
{
@Override
public String apply(Map<String, Object> row)
{
return row.get("id").toString();
final ImmutableList.Builder<Task> tasks = ImmutableList.builder();
for (final Map<String, Object> row : dbTasks) {
final String id = row.get("id").toString();

try {
final Task task = jsonMapper.readValue((byte[])row.get("payload"), Task.class);
final TaskStatus status = jsonMapper.readValue((byte[])row.get("status_payload"), TaskStatus.class);

if (status.isRunnable()) {
tasks.add(task);
}
} catch (Exception e) {
log.makeAlert(e, "Failed to parse task payload").addData("task", id).emit();
}
}
);

return tasks.build();
}
}
);
Expand Down Expand Up @@ -260,7 +266,7 @@ public Integer withHandle(Handle handle) throws Exception
)
)
.bind("task_id", taskid)
.bind("lock_payload", jsonMapper.writeValueAsString(taskLock))
.bind("lock_payload", jsonMapper.writeValueAsBytes(taskLock))
.execute();
}
}
Expand Down Expand Up @@ -340,7 +346,7 @@ public Integer withHandle(Handle handle) throws Exception
)
)
.bind("task_id", task.getId())
.bind("log_payload", jsonMapper.writeValueAsString(taskAction))
.bind("log_payload", jsonMapper.writeValueAsBytes(taskAction))
.execute();
}
}
Expand Down Expand Up @@ -373,7 +379,7 @@ public List<TaskAction> withHandle(Handle handle) throws Exception
public TaskAction apply(Map<String, Object> row)
{
try {
return jsonMapper.readValue(row.get("log_payload").toString(), TaskAction.class);
return jsonMapper.readValue((byte[])row.get("log_payload"), TaskAction.class);
} catch(Exception e) {
throw Throwables.propagate(e);
}
Expand Down Expand Up @@ -405,7 +411,7 @@ public Map<Long, TaskLock> withHandle(Handle handle) throws Exception

final Map<Long, TaskLock> retMap = Maps.newHashMap();
for(final Map<String, Object> row : dbTaskLocks) {
retMap.put((Long)row.get("id"), jsonMapper.readValue(row.get("lock_payload").toString(), TaskLock.class));
retMap.put((Long)row.get("id"), jsonMapper.readValue((byte[])row.get("lock_payload"), TaskLock.class));
}
return retMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,15 @@ public Optional<TaskStatus> getStatus(String taskid)
}

@Override
public List<String> getRunningTaskIds()
public List<Task> getRunningTasks()
{
giant.lock();

try {
final ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
final ImmutableList.Builder<Task> listBuilder = ImmutableList.builder();
for(final TaskStuff taskStuff : tasks.values()) {
if(taskStuff.getStatus().isRunnable()) {
listBuilder.add(taskStuff.getTask().getId());
listBuilder.add(taskStuff.getTask());
}
}

Expand Down
Loading

0 comments on commit 42c3f27

Please sign in to comment.