Skip to content

Commit

Permalink
chore(broker-core): spawn and consume tokens explicitly
Browse files Browse the repository at this point in the history
* tokens are not consumed implicitly when an event is processed
* tokens are not spawned implicitly when an event is produced
* deferToken() doesn't spawn a token
* expose element instance state in context
* remove UPDATE_PAYLOAD hack
  • Loading branch information
saig0 committed Jan 7, 2019
1 parent 396c14e commit 5080071
Show file tree
Hide file tree
Showing 15 changed files with 498 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.broker.workflow.model.element.ExecutableFlowElement;
import io.zeebe.broker.workflow.state.ElementInstance;
import io.zeebe.broker.workflow.state.ElementInstanceState;
import io.zeebe.msgpack.mapping.MsgPackMergeTool;
import io.zeebe.protocol.impl.record.value.incident.ErrorType;
import io.zeebe.protocol.impl.record.value.incident.IncidentRecord;
Expand All @@ -41,6 +42,7 @@ public class BpmnStepContext<T extends ExecutableFlowElement> {
private ExecutableFlowElement element;
private TypedCommandWriter commandWriter;

private ElementInstanceState elementInstanceState;
private ElementInstance flowScopeInstance;
private ElementInstance elementInstance;

Expand Down Expand Up @@ -131,4 +133,12 @@ public void raiseIncident(ErrorType errorType, String errorMessage) {
eventOutput.storeFailedToken(record);
commandWriter.appendNewCommand(IncidentIntent.CREATE, incidentCommand);
}

public void setElementInstanceState(ElementInstanceState elementInstanceState) {
this.elementInstanceState = elementInstanceState;
}

public ElementInstanceState getElementInstanceState() {
return elementInstanceState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public void processRecord(
populateEventContext(record, streamWriter, sideEffect);

if (stepGuards.shouldHandle(context)) {
state.onEventConsumed(record);
stepHandlers.handle(context);
elementInstanceState.flushDirtyState();
}
Expand All @@ -90,6 +89,7 @@ private void populateEventContext(

context.setRecord(record);
context.setStreamWriter(streamWriter);
context.setElementInstanceState(workflowState.getElementInstanceState());

context.getSideEffect().clear();
sideEffect.accept(context.getSideEffect());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.zeebe.broker.workflow.state.DeployedWorkflow;
import io.zeebe.broker.workflow.state.ElementInstance;
import io.zeebe.broker.workflow.state.ElementInstanceState;
import io.zeebe.broker.workflow.state.IndexedRecord;
import io.zeebe.broker.workflow.state.StoredRecord;
import io.zeebe.broker.workflow.state.StoredRecord.Purpose;
import io.zeebe.broker.workflow.state.TimerInstance;
Expand All @@ -44,7 +43,6 @@
import io.zeebe.protocol.intent.TimerIntent;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import io.zeebe.util.sched.clock.ActorClock;
import java.util.List;
import org.agrona.DirectBuffer;

public class CatchEventBehavior {
Expand Down Expand Up @@ -137,11 +135,6 @@ public boolean occurEventForElement(
streamWriter.appendFollowUpEvent(
elementInstanceKey, WorkflowInstanceIntent.EVENT_OCCURRED, workflowInstanceRecord);

// TODO (saig0): While processing the event a token is consumed. We need to spawn a new one
// here explicitly - see #1767
elementInstanceState.getInstance(elementInstance.getParentKey()).spawnToken();
elementInstanceState.flushDirtyState();

return true;

} else {
Expand Down Expand Up @@ -173,25 +166,28 @@ public void deferEvent(BpmnStepContext<?> context) {
}

context.getOutput().deferEvent(context.getRecord());

// spawn a new token to continue at the event
context.getFlowScopeInstance().spawnToken();
}

public void triggerDeferredEvent(BpmnStepContext<?> context) {
final TypedRecord<WorkflowInstanceRecord> record = context.getRecord();
final long elementInstanceKey = record.getKey();
final long scopeInstanceKey = record.getValue().getScopeInstanceKey();
final List<IndexedRecord> deferredTokens =
state.getWorkflowState().getElementInstanceState().getDeferredTokens(scopeInstanceKey);

for (IndexedRecord token : deferredTokens) {
if (token.getKey() == elementInstanceKey
&& token.getState() == WorkflowInstanceIntent.EVENT_OCCURRED) {
final StoredRecord token =
state.getWorkflowState().getElementInstanceState().getTokenEvent(elementInstanceKey);

context
.getOutput()
.appendNewEvent(WorkflowInstanceIntent.EVENT_TRIGGERING, token.getValue());
if (token != null
&& token.getPurpose() == Purpose.DEFERRED_TOKEN
&& token.getRecord().getState() == WorkflowInstanceIntent.EVENT_OCCURRED) {

context.getOutput().consumeDeferredEvent(scopeInstanceKey, elementInstanceKey);
}
context
.getOutput()
.appendNewEvent(WorkflowInstanceIntent.EVENT_TRIGGERING, token.getRecord().getValue());

context.getOutput().consumeDeferredEvent(scopeInstanceKey, elementInstanceKey);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ public void handle(final BpmnStepContext<ExecutableCatchEventSupplier> context)

} catch (MessageCorrelationKeyException e) {
context.raiseIncident(ErrorType.EXTRACT_VALUE_ERROR, e.getMessage());

// TODO (saig0): While processing the event a token is consumed. If an incident is raised then
// we don't defer and spawn a token. So we need to spawn a new one here explicitly - see #1767
context.getFlowScopeInstance().spawnToken();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,37 @@ public void handle(BpmnStepContext<ExecutableBoundaryEvent> context) {
final ExecutableBoundaryEvent element = context.getElement();

if (element.cancelActivity()) {
final ElementInstance elementInstance = context.getElementInstance();
triggerInterruptingBoundaryEvent(context);

if (elementInstance != null
&& elementInstance.getState() == WorkflowInstanceIntent.ELEMENT_ACTIVATED) {
} else {
triggerNonInterruptingBoundaryEvent(context);
}
}

context.getCatchEventBehavior().deferEvent(context);
private void triggerInterruptingBoundaryEvent(BpmnStepContext<ExecutableBoundaryEvent> context) {
final ElementInstance elementInstance = context.getElementInstance();

if (elementInstance != null
&& elementInstance.getState() == WorkflowInstanceIntent.ELEMENT_ACTIVATED) {

context.getCatchEventBehavior().deferEvent(context);

context
.getOutput()
.appendFollowUpEvent(
context.getRecord().getKey(),
WorkflowInstanceIntent.ELEMENT_TERMINATING,
context.getElementInstance().getValue());
}
} else {
context
.getOutput()
.appendNewEvent(WorkflowInstanceIntent.EVENT_TRIGGERING, context.getRecord().getValue());
.appendFollowUpEvent(
context.getRecord().getKey(),
WorkflowInstanceIntent.ELEMENT_TERMINATING,
context.getElementInstance().getValue());
}
}

private void triggerNonInterruptingBoundaryEvent(
BpmnStepContext<ExecutableBoundaryEvent> context) {
context
.getOutput()
.appendNewEvent(WorkflowInstanceIntent.EVENT_TRIGGERING, context.getRecord().getValue());

// spawn a new token to continue at the event
context.getFlowScopeInstance().spawnToken();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public void handle(BpmnStepContext<ExecutableFlowNode> context) {
final ElementInstance scopeInstance = context.getFlowScopeInstance();
final WorkflowInstanceRecord scopeInstanceValue = scopeInstance.getValue();

scopeInstance.consumeToken();

if (scopeInstance.getNumberOfActiveExecutionPaths() == 0) {
scopeInstanceValue.setPayload(value.getPayload());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ public class PropagateTerminationHandler implements BpmnStepHandler<ExecutableFl
@Override
public void handle(BpmnStepContext<ExecutableFlowNode> context) {
final ElementInstance flowScopeInstance = context.getFlowScopeInstance();
final int activeExecutionPaths = flowScopeInstance.getNumberOfActiveExecutionPaths();

// consume the current token
flowScopeInstance.consumeToken();

// and check for other tokens of the flow scope
final int activeExecutionPaths = flowScopeInstance.getNumberOfActiveExecutionPaths();
if (activeExecutionPaths > 0) {
context.getCatchEventBehavior().triggerDeferredEvent(context);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.zeebe.broker.workflow.model.element.ExecutableSequenceFlow;
import io.zeebe.broker.workflow.processor.BpmnStepContext;
import io.zeebe.broker.workflow.processor.BpmnStepHandler;
import io.zeebe.broker.workflow.state.ElementInstance;
import io.zeebe.msgpack.el.CompiledJsonCondition;
import io.zeebe.msgpack.el.JsonConditionException;
import io.zeebe.msgpack.el.JsonConditionInterpreter;
Expand All @@ -47,29 +46,13 @@ public void handle(BpmnStepContext<ExecutableExclusiveGateway> context) {
context.getOutput().appendNewEvent(WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN, value);
} else {
final String errorMessage = "All conditions evaluated to false and no default flow is set.";
raiseIncident(context, errorMessage);
context.raiseIncident(ErrorType.CONDITION_ERROR, errorMessage);
}
} catch (JsonConditionException e) {
raiseIncident(context, e.getMessage());
context.raiseIncident(ErrorType.CONDITION_ERROR, e.getMessage());
}
}

private void raiseIncident(
BpmnStepContext<ExecutableExclusiveGateway> context, final String errorMessage) {
context.raiseIncident(ErrorType.CONDITION_ERROR, errorMessage);

// TODO: this is a hack to avoid that we believe this token is consumed when the incident is
// raised (because no follow-up token event is published), which could wrongfully lead to
// scope completion on a parallel branch.
// Ideas to resolve this:
// - explicitly represent incidents in the index, so we can consider them when checking if a
// scope can complete etc.
// - rework incident concept via https://github.com/zeebe-io/zeebe/issues/1033; maybe this
// will also then go away
final ElementInstance flowScopeInstance = context.getFlowScopeInstance();
flowScopeInstance.spawnToken();
}

private ExecutableSequenceFlow getSequenceFlowWithFulfilledCondition(
ExecutableExclusiveGateway exclusiveGateway, DirectBuffer payload) {
final List<ExecutableSequenceFlow> sequenceFlows = exclusiveGateway.getOutgoingWithCondition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,23 @@
import io.zeebe.broker.workflow.processor.EventOutput;
import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceRecord;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import java.util.List;

public class ParallelSplitHandler implements BpmnStepHandler<ExecutableFlowNode> {

@Override
public void handle(final BpmnStepContext<ExecutableFlowNode> context) {
final ExecutableFlowNode element = context.getElement();
final WorkflowInstanceRecord value = context.getValue();

final List<ExecutableSequenceFlow> outgoingFlows = element.getOutgoing();

final EventOutput eventOutput = context.getOutput();

for (final ExecutableSequenceFlow flow : outgoingFlows) {
// consume the incoming token and spawn a new one for each outgoing sequence flow
context.getFlowScopeInstance().consumeToken();

for (final ExecutableSequenceFlow flow : element.getOutgoing()) {
value.setElementId(flow.getId());
eventOutput.appendNewEvent(WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN, value);

context.getFlowScopeInstance().spawnToken();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,28 @@ public ParallelMergeHandler(WorkflowState workflowState) {

@Override
public void handle(BpmnStepContext<ExecutableSequenceFlow> context) {

final EventOutput eventOutput = context.getOutput();

final ElementInstance scopeInstance = context.getFlowScopeInstance();
eventOutput.deferEvent(context.getRecord());

final EventOutput eventOutput = context.getOutput();
final ExecutableSequenceFlow sequenceFlow = context.getElement();
final ExecutableFlowNode gateway = sequenceFlow.getTarget();

final List<IndexedRecord> mergeableRecords = getMergeableRecords(gateway, scopeInstance);
eventOutput.deferEvent(context.getRecord());

final List<IndexedRecord> mergeableRecords = getMergeableRecords(gateway, scopeInstance);
if (mergeableRecords.size() == gateway.getIncoming().size()) {

// consume all deferred tokens and spawn a new one to continue after the gateway
mergeableRecords.forEach(
r -> eventOutput.consumeDeferredEvent(scopeInstance.getKey(), r.getKey()));
r -> {
eventOutput.consumeDeferredEvent(scopeInstance.getKey(), r.getKey());
context.getFlowScopeInstance().consumeToken();
});

final WorkflowInstanceRecord value = context.getValue();
value.setElementId(gateway.getId());
context.getOutput().appendNewEvent(WorkflowInstanceIntent.GATEWAY_ACTIVATED, value);

context.getFlowScopeInstance().spawnToken();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected void terminate(BpmnStepContext<ExecutableFlowNode> context) {
String.format("Expected to find job with key %d, but no job found", jobKey));
}

context.getOutput().getStreamWriter().appendFollowUpCommand(jobKey, JobIntent.CANCEL, job);
context.getCommandWriter().appendFollowUpCommand(jobKey, JobIntent.CANCEL, job);
resolveExistingJobIncident(jobKey, context);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
*/
package io.zeebe.broker.workflow.processor.subprocess;

import io.zeebe.broker.workflow.model.element.ExecutableCatchEventElement;
import io.zeebe.broker.workflow.model.element.ExecutableFlowElementContainer;
import io.zeebe.broker.workflow.processor.BpmnStepContext;
import io.zeebe.broker.workflow.processor.BpmnStepHandler;
import io.zeebe.broker.workflow.state.IndexedRecord;
import io.zeebe.broker.workflow.state.StoredRecord.Purpose;
import io.zeebe.broker.workflow.state.WorkflowState;
import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceRecord;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
Expand All @@ -38,6 +38,7 @@ public TriggerStartEventHandler(WorkflowState workflowState) {
@Override
public void handle(BpmnStepContext<ExecutableFlowElementContainer> context) {
final ExecutableFlowElementContainer element = context.getElement();
final long scopeInstanceKey = context.getRecord().getKey();

final WorkflowInstanceRecord value = context.getValue();

Expand All @@ -53,24 +54,23 @@ public void handle(BpmnStepContext<ExecutableFlowElementContainer> context) {
workflowState.getElementInstanceState().getDeferredTokens(wfInstanceKey);

if (deferredTokens.size() > 0) {
// if there are deferred tokens

value.setElementId(deferredTokens.get(0).getValue().getElementId());
workflowState.getElementInstanceState().consumeToken(wfInstanceKey);
final IndexedRecord deferredToken = deferredTokens.get(0);
value.setElementId(deferredToken.getValue().getElementId());
workflowState
.getElementInstanceState()
.removeStoredRecord(wfInstanceKey, deferredToken.getKey(), Purpose.DEFERRED_TOKEN);
} else {
// if there are no tokens for the timer start event

throw new RuntimeException(
"Workflow has multiple start events but no deferred token was found");
}
}

value.setScopeInstanceKey(context.getRecord().getKey());
value.setScopeInstanceKey(scopeInstanceKey);

context.getOutput().appendNewEvent(WorkflowInstanceIntent.EVENT_TRIGGERING, value);
}

public static boolean isNoneStartEvent(ExecutableCatchEventElement startEvent) {
return !startEvent.isTimer() && !startEvent.isMessage();
// spawn a new token to continue at the start event
context.getElementInstanceState().spawnToken(scopeInstanceKey);
}
}
Loading

0 comments on commit 5080071

Please sign in to comment.