Skip to content

Commit

Permalink
chore(benchmark): worker completes jobs async
Browse files Browse the repository at this point in the history
  • Loading branch information
korthout committed Aug 19, 2020
1 parent 6981979 commit c964407
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright © 2017 camunda services GmbH ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.zeebe;

import io.zeebe.Worker.DelayedCommand;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;

public class DelayedCommandSender extends Thread {

private volatile boolean shuttingDown = false;
private final BlockingDeque<DelayedCommand> commands;
private final BlockingQueue<Future<?>> requestFutures;

public DelayedCommandSender(
final BlockingDeque<DelayedCommand> delayedCommands,
final BlockingQueue<Future<?>> requestFutures) {
this.commands = delayedCommands;
this.requestFutures = requestFutures;
}

@Override
public void run() {
while (!shuttingDown) {
try {
final var delayedCommand = commands.takeFirst();
if (!delayedCommand.hasExpired()) {
commands.addFirst(delayedCommand);
} else {
requestFutures.add(delayedCommand.getCommand().send());
}
} catch (InterruptedException e) {
// ignore and retry
}
}
}

public void close() {
shuttingDown = true;
interrupt();
}
}
48 changes: 42 additions & 6 deletions benchmarks/project/src/main/java/io/zeebe/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

import io.zeebe.client.ZeebeClient;
import io.zeebe.client.ZeebeClientBuilder;
import io.zeebe.client.api.command.FinalCommandStep;
import io.zeebe.client.api.worker.JobWorker;
import io.zeebe.config.AppCfg;
import io.zeebe.config.WorkerCfg;
import java.time.Instant;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;

public class Worker extends App {

Expand All @@ -39,6 +43,7 @@ public void run() {
final long completionDelay = workerCfg.getCompletionDelay().toMillis();
final var variables = readVariables(workerCfg.getPayloadPath());
final BlockingQueue<Future<?>> requestFutures = new ArrayBlockingQueue<>(10_000);
final BlockingDeque<DelayedCommand> delayedCommands = new LinkedBlockingDeque<>(10_000);

final ZeebeClient client = createZeebeClient();
printTopology(client);
Expand All @@ -49,25 +54,37 @@ public void run() {
.jobType(jobType)
.handler(
(jobClient, job) -> {
try {
Thread.sleep(completionDelay);
} catch (Exception e) {
e.printStackTrace();
final var command =
jobClient.newCompleteCommand(job.getKey()).variables(variables);
if (workerCfg.isCompleteJobsAsync()) {
delayedCommands.addLast(
new DelayedCommand(Instant.now().plusMillis(completionDelay), command));
} else {
try {
Thread.sleep(completionDelay);
} catch (Exception e) {
e.printStackTrace();
}
requestFutures.add(command.send());
}
requestFutures.add(
jobClient.newCompleteCommand(job.getKey()).variables(variables).send());
})
.open();

final ResponseChecker responseChecker = new ResponseChecker(requestFutures);
responseChecker.start();

final var asyncJobCompleter = new DelayedCommandSender(delayedCommands, requestFutures);
if (workerCfg.isCompleteJobsAsync()) {
asyncJobCompleter.start();
}

Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
worker.close();
client.close();
asyncJobCompleter.close();
responseChecker.close();
}));
}
Expand Down Expand Up @@ -95,4 +112,23 @@ private ZeebeClient createZeebeClient() {
public static void main(String[] args) {
createApp(Worker::new);
}

static final class DelayedCommand {

private final Instant expiration;
private final FinalCommandStep<?> command;

public DelayedCommand(final Instant expiration, final FinalCommandStep<?> command) {
this.expiration = expiration;
this.command = command;
}

public boolean hasExpired() {
return Instant.now().isAfter(expiration);
}

public FinalCommandStep<?> getCommand() {
return command;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class WorkerCfg {
private int capacity;
private Duration pollingDelay;
private Duration completionDelay;
private boolean completeJobsAsync;

private String payloadPath;

Expand Down Expand Up @@ -85,4 +86,12 @@ public String getPayloadPath() {
public void setPayloadPath(String payloadPath) {
this.payloadPath = payloadPath;
}

public boolean isCompleteJobsAsync() {
return completeJobsAsync;
}

public void setCompleteJobsAsync(boolean completeJobsAsync) {
this.completeJobsAsync = completeJobsAsync;
}
}
1 change: 1 addition & 0 deletions benchmarks/project/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ app {
capacity = 30
pollingDelay = 1s
completionDelay = 300ms
completeJobsAsync = false
payloadPath = "bpmn/big_payload.json"
}
}

0 comments on commit c964407

Please sign in to comment.