Skip to content

Commit

Permalink
Fix for failing AsyncExecutorTest (part4: adding logging + changing t…
Browse files Browse the repository at this point in the history
…he way was waited for jobs to be finished, not shutting down the executor)
  • Loading branch information
jbarrez committed Dec 17, 2014
1 parent 635fa2b commit 70af9a4
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,16 @@ public static void waitForJobExecutorToProcessAllJobs(ActivitiRule activitiRule,
waitForJobExecutorToProcessAllJobs(activitiRule.getProcessEngine().getProcessEngineConfiguration(),
activitiRule.getManagementService(), maxMillisToWait, intervalMillis);
}

public static void waitForJobExecutorToProcessAllJobs(ProcessEngineConfiguration processEngineConfiguration,
ManagementService managementService, long maxMillisToWait, long intervalMillis) {

waitForJobExecutorToProcessAllJobs(processEngineConfiguration, managementService, maxMillisToWait, intervalMillis, true);

}

public static void waitForJobExecutorToProcessAllJobs(ProcessEngineConfiguration processEngineConfiguration,
ManagementService managementService, long maxMillisToWait, long intervalMillis, boolean shutdownExecutorWhenFinished) {

JobExecutor jobExecutor = null;
AsyncExecutor asyncExecutor = null;
Expand Down Expand Up @@ -80,11 +87,13 @@ public static void waitForJobExecutorToProcessAllJobs(ProcessEngineConfiguration
}

} finally {
if (processEngineConfiguration.isAsyncExecutorEnabled() == false) {
jobExecutor.shutdown();
} else {
asyncExecutor.shutdown();
}
if (shutdownExecutorWhenFinished) {
if (processEngineConfiguration.isAsyncExecutorEnabled() == false) {
jobExecutor.shutdown();
} else {
asyncExecutor.shutdown();
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@
package org.activiti.engine.test.jobexecutor;

import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.activiti.engine.ActivitiException;
import org.activiti.engine.ProcessEngine;
Expand All @@ -29,6 +26,8 @@
import org.activiti.engine.impl.test.JobTestHelper;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Tests specifically for the {@link AsyncExecutor}.
Expand Down Expand Up @@ -274,7 +273,8 @@ private ProcessEngine createProcessEngine(boolean enableAsyncExecutor, Date time
processEngineConfiguration.setAsyncExecutorActivate(true);

CountingAsyncExecutor countingAsyncExecutor = new CountingAsyncExecutor();
countingAsyncExecutor.setDefaultAsyncJobAcquireWaitTimeInMillis(1); // To avoid waiting too long when a retry happens
countingAsyncExecutor.setDefaultAsyncJobAcquireWaitTimeInMillis(50); // To avoid waiting too long when a retry happens
countingAsyncExecutor.setDefaultTimerJobAcquireWaitTimeInMillis(50);
processEngineConfiguration.setAsyncExecutor(countingAsyncExecutor);
}

Expand Down Expand Up @@ -314,7 +314,7 @@ private void waitForAllJobsBeingExecuted(ProcessEngine processEngine) {
}

private void waitForAllJobsBeingExecuted(ProcessEngine processEngine, long maxWaitTime) {
JobTestHelper.waitForJobExecutorToProcessAllJobs(processEngine.getProcessEngineConfiguration(), processEngine.getManagementService(), maxWaitTime, 50L);
JobTestHelper.waitForJobExecutorToProcessAllJobs(processEngine.getProcessEngineConfiguration(), processEngine.getManagementService(), maxWaitTime, 1000L, false);
}

private int getAsyncExecutorJobCount(ProcessEngine processEngine) {
Expand All @@ -327,12 +327,16 @@ private int getAsyncExecutorJobCount(ProcessEngine processEngine) {

static class CountingAsyncExecutor extends DefaultAsyncJobExecutor {

private static final Logger logger = LoggerFactory.getLogger(CountingAsyncExecutor.class);

private AtomicInteger counter = new AtomicInteger(0);

@Override
public void executeAsyncJob(JobEntity job) {
logger.info("About to execute job " + job.getId());
counter.incrementAndGet();
super.executeAsyncJob(job);
logger.info("Handed off job " + job.getId() + " to async executor (retries=" + job.getRetries() + ")");
}

public AtomicInteger getCounter() {
Expand Down

0 comments on commit 70af9a4

Please sign in to comment.