Skip to content

Commit

Permalink
Closes nirmata#22 - Added withTaskRunnerService() to the builder so t…
Browse files Browse the repository at this point in the history
…hat users can control which thread executes tasks
  • Loading branch information
Randgalt committed Feb 10, 2016
1 parent c6068f3 commit bd08263
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 6 deletions.
20 changes: 19 additions & 1 deletion src/main/java/com/nirmata/workflow/WorkflowManagerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.nirmata.workflow.admin.AutoCleaner;
import com.nirmata.workflow.details.AutoCleanerHolder;
import com.nirmata.workflow.details.TaskExecutorSpec;
Expand All @@ -32,6 +33,7 @@
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutorService;

/**
* Builds {@link WorkflowManager} instances
Expand All @@ -43,6 +45,7 @@ public class WorkflowManagerBuilder
private CuratorFramework curator;
private AutoCleanerHolder autoCleanerHolder = newNullHolder();
private Serializer serializer = new StandardSerializer();
private ExecutorService taskRunnerService = MoreExecutors.sameThreadExecutor();

private final List<TaskExecutorSpec> specs = Lists.newArrayList();

Expand Down Expand Up @@ -138,7 +141,7 @@ public WorkflowManagerBuilder withInstanceName(String instanceName)
*/
public WorkflowManager build()
{
return new WorkflowManagerImpl(curator, queueFactory, instanceName, specs, autoCleanerHolder, serializer);
return new WorkflowManagerImpl(curator, queueFactory, instanceName, specs, autoCleanerHolder, serializer, taskRunnerService);
}

/**
Expand Down Expand Up @@ -182,6 +185,21 @@ public WorkflowManagerBuilder withSerializer(Serializer serializer)
return this;
}

/**
* <em>optional</em><br>
* By default, tasks are run in an internal executor service. Use this to specify a custom executor service
* for tasks. This executor does not add any async/concurrency benefit. It's purpose is to allow you to control
* which thread executes your tasks.
*
* @param taskRunnerService custom executor service
* @return this (for chaining)
*/
public WorkflowManagerBuilder withTaskRunnerService(ExecutorService taskRunnerService)
{
this.taskRunnerService = Preconditions.checkNotNull(taskRunnerService, "taskRunnerService cannot be null");
return this;
}

private WorkflowManagerBuilder()
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.nirmata.workflow.admin.WorkflowAdmin;
import com.nirmata.workflow.details.internalmodels.RunnableTask;
import com.nirmata.workflow.details.internalmodels.StartedTask;
import com.nirmata.workflow.events.WorkflowEvent;
import com.nirmata.workflow.events.WorkflowListenerManager;
import com.nirmata.workflow.executor.TaskExecution;
import com.nirmata.workflow.executor.TaskExecutor;
Expand All @@ -41,21 +40,21 @@
import com.nirmata.workflow.queue.QueueConsumer;
import com.nirmata.workflow.queue.QueueFactory;
import com.nirmata.workflow.serialization.Serializer;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -69,6 +68,7 @@ public class WorkflowManagerImpl implements WorkflowManager, WorkflowAdmin
private final SchedulerSelector schedulerSelector;
private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
private final Serializer serializer;
private final ExecutorService taskRunnerService;

private static final TaskType nullTaskType = new TaskType("", "", false);

Expand All @@ -79,8 +79,9 @@ private enum State
CLOSED
}

public WorkflowManagerImpl(CuratorFramework curator, QueueFactory queueFactory, String instanceName, List<TaskExecutorSpec> specs, AutoCleanerHolder autoCleanerHolder, Serializer serializer)
public WorkflowManagerImpl(CuratorFramework curator, QueueFactory queueFactory, String instanceName, List<TaskExecutorSpec> specs, AutoCleanerHolder autoCleanerHolder, Serializer serializer, ExecutorService taskRunnerService)
{
this.taskRunnerService = Preconditions.checkNotNull(taskRunnerService, "taskRunnerService cannot be null");
this.serializer = Preconditions.checkNotNull(serializer, "serializer cannot be null");
autoCleanerHolder = Preconditions.checkNotNull(autoCleanerHolder, "autoCleanerHolder cannot be null");
this.curator = Preconditions.checkNotNull(curator, "curator cannot be null");
Expand Down Expand Up @@ -499,7 +500,21 @@ private void executeTask(TaskExecutor taskExecutor, ExecutableTask executableTas
log.info("Executing task: " + executableTask);
TaskExecution taskExecution = taskExecutor.newTaskExecution(this, executableTask);

TaskExecutionResult result = taskExecution.execute();
TaskExecutionResult result;
try
{
result = taskRunnerService.submit(taskExecution::execute).get();
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
return;
}
catch ( ExecutionException e )
{
log.error("Could not execute task: " + executableTask, e);
throw new RuntimeException(e);
}
if ( result == null )
{
throw new RuntimeException(String.format("null returned from task executor for run: %s, task %s", executableTask.getRunId(), executableTask.getTaskId()));
Expand Down

0 comments on commit bd08263

Please sign in to comment.