Skip to content

Commit

Permalink
Merge branch 'gh-3966' of github.com:p-kimberley/stroom into gh-3966
Browse files Browse the repository at this point in the history
  • Loading branch information
stroomdev66 committed Jan 23, 2024
2 parents 6ba6ad8 + c2ef0b9 commit 2ab1e74
Show file tree
Hide file tree
Showing 54 changed files with 1,542 additions and 562 deletions.
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,10 @@ subprojects {
test {
useJUnitPlatform()

// set heap size for the test JVM(s)
// minHeapSize = "128m"
// maxHeapSize = "8G"

// Run test classes in N different JVMs
// The internet seems to suggest the divide by 2, not sure why, maybe
// not to exhaust all cores. Removing it doesn't seem to speed it up.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package stroom.job.impl;

import stroom.cluster.impl.MockClusterNodeManager;
import stroom.cluster.task.impl.TargetNodeSetFactoryImpl;
import stroom.job.api.DistributedTask;
import stroom.job.api.DistributedTaskFactory;
import stroom.job.shared.Job;
import stroom.job.shared.JobNode;
import stroom.job.shared.JobNode.JobType;
import stroom.node.api.NodeInfo;
import stroom.node.mock.MockNodeInfo;
import stroom.security.mock.MockSecurityContext;
import stroom.task.api.ExecutorProvider;
import stroom.task.api.SimpleTaskContextFactory;
import stroom.task.api.TaskContextFactory;
import stroom.task.api.ThreadPoolImpl;
import stroom.task.shared.ThreadPool;
import stroom.test.common.util.test.StroomUnitTest;
import stroom.util.concurrent.ThreadUtil;
import stroom.util.logging.LambdaLogger;
import stroom.util.logging.LambdaLoggerFactory;
import stroom.util.logging.Metrics;
import stroom.util.scheduler.FrequencyScheduler;
import stroom.util.scheduler.Scheduler;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

class TestDistributedTaskFetcher extends StroomUnitTest {

private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(TestDistributedTaskFetcher.class);

@Disabled
@Test
void test() {
try (final ExecutorService executors = Executors.newCachedThreadPool()) {
final ExecutorProvider executorProvider = new ExecutorProvider() {
@Override
public Executor get(final ThreadPool threadPool) {
return executors;
}

@Override
public Executor get() {
return executors;
}
};
final TaskContextFactory taskContextFactory = new SimpleTaskContextFactory();

final ThreadPool threadPool = new ThreadPoolImpl("MY_THREAD_POOL");
final String jobName = "MY_DISTRIBUTED_JOB";
final String nodeName = "NODE_NAME";
final String frequency = "10s";
final Job job = new Job(1, true, jobName, false);
final JobNode jobNode =
new JobNode(1, nodeName, job, 100, JobType.DISTRIBUTED, frequency, true);
final JobNodeTracker jobNodeTracker = new JobNodeTracker(jobNode);
final Scheduler scheduler = new FrequencyScheduler(frequency);
final JobNodeTrackerCache jobNodeTrackerCache = () -> new JobNodeTrackers() {
@Override
public JobNodeTracker getTrackerForJobName(final String jobName1) {
return jobNodeTracker;
}

@Override
public List<JobNodeTracker> getDistributedJobNodeTrackers() {
return List.of(jobNodeTracker);
}

@Override
public Scheduler getScheduler(final JobNode jobNode1) {
return scheduler;
}
};

final NodeInfo nodeInfo = new MockNodeInfo();
final AtomicLong executionCount = new AtomicLong();

Metrics.setEnabled(true);

final DistributedTaskFactory distributedTaskFactory = new DistributedTaskFactory() {
@Override
public List<DistributedTask> fetch(final String nodeName, final int count) {
if (count == 0) {
LOGGER.info("ZERO");
}

final List<DistributedTask> list = new ArrayList<>(count);
Metrics.measure("fetch", () -> {
for (int i = 0; i < count; i++) {
final Runnable runnable = () ->
Metrics.measure("exec task", executionCount::incrementAndGet);
final DistributedTask distributedTask =
new DistributedTask(jobName, runnable, threadPool, "test");
list.add(distributedTask);
}
});
return list;
}

@Override
public Boolean abandon(final String nodeName, final List<DistributedTask> tasks) {
return Boolean.TRUE;
}
};
final DistributedTaskFactoryRegistry distributedTaskFactoryRegistry = () ->
Map.of(jobName, distributedTaskFactory);

final DistributedTaskFetcher distributedTaskFetcher =
new DistributedTaskFetcher(
executorProvider,
taskContextFactory,
jobNodeTrackerCache,
nodeInfo,
new MockSecurityContext(),
distributedTaskFactoryRegistry,
new TargetNodeSetFactoryImpl(nodeInfo, new MockClusterNodeManager(nodeInfo)));
distributedTaskFetcher.execute();

while (true) {
Metrics.report();
ThreadUtil.sleep(1000);
}
}
}
}
137 changes: 137 additions & 0 deletions stroom-app/src/test/java/stroom/job/impl/TestJobNodeTrackerCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package stroom.job.impl;

import stroom.job.shared.FindJobNodeCriteria;
import stroom.job.shared.Job;
import stroom.job.shared.JobNode;
import stroom.job.shared.JobNode.JobType;
import stroom.job.shared.JobNodeListResponse;
import stroom.node.api.NodeInfo;
import stroom.node.mock.MockNodeInfo;
import stroom.task.api.ExecutorProvider;
import stroom.task.shared.ThreadPool;
import stroom.test.common.util.test.StroomUnitTest;
import stroom.util.concurrent.ThreadUtil;
import stroom.util.logging.LambdaLogger;
import stroom.util.logging.LambdaLoggerFactory;
import stroom.util.logging.Metrics;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.mockito.stubbing.Answer;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class TestJobNodeTrackerCache extends StroomUnitTest {

private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(TestJobNodeTrackerCache.class);

@Mock
private JobNodeDao jobNodeDao;

@Test
void testQuick() throws InterruptedException {
test(0, 1, TimeUnit.SECONDS);
}

@Test
void testDelayed() throws InterruptedException {
test(5000, 1, TimeUnit.SECONDS);
}

@Disabled
@Test
void testQuickPerformance() throws InterruptedException {
test(0, 40, TimeUnit.SECONDS);
}

@Disabled
@Test
void testDelayedPerformance() throws InterruptedException {
test(5000, 40, TimeUnit.SECONDS);
}

void test(final long delay, final long totalTime, final TimeUnit timeUnit) throws InterruptedException {
final AtomicLong calls = new AtomicLong();
try (final ScheduledExecutorService executors =
Executors.newScheduledThreadPool(4)) {
final String jobName = "MY_DISTRIBUTED_JOB";
final String nodeName = "NODE_NAME";
final String frequency = "10s";
final Job job = new Job(1, true, jobName, false);
final JobNode jobNode =
new JobNode(1, nodeName, job, 100, JobType.DISTRIBUTED, frequency, true);
final JobNodeListResponse jobNodeListResponse = JobNodeListResponse
.createUnboundedJobeNodeResponse(List.of(jobNode));

Metrics.setEnabled(true);

final ExecutorProvider executorProvider = new ExecutorProvider() {
@Override
public Executor get(final ThreadPool threadPool) {
return executors;
}

@Override
public Executor get() {
return executors;
}
};
final NodeInfo nodeInfo = new MockNodeInfo();
final JobNodeTrackerCacheImpl jobNodeTrackerCache =
new JobNodeTrackerCacheImpl(nodeInfo, jobNodeDao, executorProvider);

// Bootstrap cache.
when(jobNodeDao.find(Mockito.any(FindJobNodeCriteria.class)))
.then((Answer<JobNodeListResponse>) invocation -> jobNodeListResponse);
jobNodeTrackerCache.getTrackers();

when(jobNodeDao.find(Mockito.any(FindJobNodeCriteria.class)))
.then((Answer<JobNodeListResponse>) invocation -> {
Metrics.measure("find", () -> {
// Add delay.
ThreadUtil.sleep(delay);
});
return jobNodeListResponse;
});

final AtomicBoolean running = new AtomicBoolean(true);
executors.execute(() -> {
while (running.get()) {
Metrics.measure("getTrackers", () -> {
jobNodeTrackerCache.getTrackers();
calls.incrementAndGet();
});
}
});

final CountDownLatch countDownLatch = new CountDownLatch(1);
executors.schedule(() -> {
running.set(false);
countDownLatch.countDown();
}, totalTime, timeUnit);

countDownLatch.await();
}

Metrics.report();

LOGGER.info(String.valueOf(calls.get()));
}
}
Loading

0 comments on commit 2ab1e74

Please sign in to comment.