Skip to content

Commit

Permalink
Handle nullable taskTypes for rolling upgrade (apache#5309)
Browse files Browse the repository at this point in the history
  • Loading branch information
jihoonson authored and gianm committed Jan 30, 2018
1 parent 64ee658 commit 3a69b0e
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 3 deletions.
5 changes: 4 additions & 1 deletion api/src/main/java/io/druid/indexer/TaskStatusPlus.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class TaskStatusPlus
@JsonCreator
public TaskStatusPlus(
@JsonProperty("id") String id,
@JsonProperty("type") String type,
@JsonProperty("type") @Nullable String type, // nullable for backward compatibility
@JsonProperty("createdTime") DateTime createdTime,
@JsonProperty("queueInsertionTime") DateTime queueInsertionTime,
@JsonProperty("state") @Nullable TaskState state,
Expand All @@ -65,6 +65,7 @@ public String getId()
return id;
}

@Nullable
@JsonProperty
public String getType()
{
Expand All @@ -83,12 +84,14 @@ public DateTime getQueueInsertionTime()
return queueInsertionTime;
}

@Nullable
@JsonProperty
public TaskState getState()
{
return state;
}

@Nullable
@JsonProperty
public Long getDuration()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.druid.java.util.common.DateTimes;
import org.joda.time.DateTime;

import javax.annotation.Nullable;

/**
* A holder for a task and different components associated with the task
*/
Expand Down Expand Up @@ -85,6 +87,11 @@ public DateTime getQueueInsertionTime()
}

public abstract TaskLocation getLocation();

/**
* Returns the type of task. The return value can be null for backward compatibility.
*/
@Nullable
public abstract String getTaskType();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,10 @@ public Collection<? extends TaskRunnerWorkItem> apply(TaskRunner taskRunner)
} else {
workItems = taskRunner.getRunningTasks()
.stream()
.filter(workitem -> workitem.getTaskType().equals(taskType))
.filter(workitem -> {
final String itemType = workitem.getTaskType();
return itemType != null && itemType.equals(taskType);
})
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,14 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
final int numRunningCompactTasks = indexingServiceClient
.getRunningTasks()
.stream()
.filter(status -> status.getType().equals(COMPACT_TASK_TYPE))
.filter(status -> {
final String taskType = status.getType();
// taskType can be null if middleManagers are running with an older version. Here, we consevatively regard
// the tasks of the unknown taskType as the compactionTask. This is because it's important to not run
// compactionTasks more than the configured limit at any time which might impact to the ingestion
// performance.
return taskType == null || taskType.equals(COMPACT_TASK_TYPE);
})
.collect(Collectors.toList())
.size();
final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources);
Expand Down

0 comments on commit 3a69b0e

Please sign in to comment.