Skip to content

Commit

Permalink
feat(broker): Correlate published messages to start events and create…
Browse files Browse the repository at this point in the history
… workflow instances
  • Loading branch information
deepthidevaki authored and menski committed Jan 8, 2019
1 parent 186f960 commit 157e66e
Show file tree
Hide file tree
Showing 11 changed files with 472 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,38 @@ public void validate(Message element, ValidationResultCollector validationResult
validationResultCollector.addError(
0, "Must have exactly one zeebe:subscription extension element");
}
} else if (isReferedByStartEvent(element)) {
if (element.getName() == null || element.getName().isEmpty()) {
validationResultCollector.addError(0, "Name must be present and not empty");
}
} else {
validateIfReferredByStartEvent(element, validationResultCollector);
}
}

private boolean isReferedByStartEvent(Message element) {
private void validateIfReferredByStartEvent(
Message element, ValidationResultCollector validationResultCollector) {
final Collection<StartEvent> startEvents =
element
.getParentElement()
.getChildElementsByType(Process.class)
.stream()
.flatMap(p -> p.getChildElementsByType(StartEvent.class).stream())
.collect(Collectors.toList());
return startEvents
.stream()
.flatMap(i -> i.getEventDefinitions().stream())
.anyMatch(
e ->
e instanceof MessageEventDefinition
&& ((MessageEventDefinition) e).getMessage() == element);
final long numReferredStartEvents =
startEvents
.stream()
.flatMap(i -> i.getEventDefinitions().stream())
.filter(
e ->
e instanceof MessageEventDefinition
&& ((MessageEventDefinition) e).getMessage() == element)
.count();

if (numReferredStartEvents > 1) {
validationResultCollector.addError(
0, "A message cannot be referred by more than one start event");
} else if (numReferredStartEvents == 1) {
if (element.getName() == null || element.getName().isEmpty()) {
validationResultCollector.addError(0, "Name must be present and not empty");
}
}
}

private boolean isReferedByCatchEvent(Message element) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import static java.util.Collections.singletonList;

import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.model.bpmn.BpmnModelInstance;
import io.zeebe.model.bpmn.builder.ProcessBuilder;
import io.zeebe.model.bpmn.instance.EndEvent;
import io.zeebe.model.bpmn.instance.IntermediateThrowEvent;
import io.zeebe.model.bpmn.instance.Message;
Expand Down Expand Up @@ -136,6 +138,19 @@ public static Object[][] parameters() {
.done(),
singletonList(expect("task", "Cannot reference the same message name as a boundary event"))
},
{
getProcessWithMultipleStartEventsWithSameMessage(),
singletonList(
expect("start-message", "A message cannot be referred by more than one start event"))
},
};
}

private static BpmnModelInstance getProcessWithMultipleStartEventsWithSameMessage() {
final ProcessBuilder process = Bpmn.createExecutableProcess();
final String messageName = "messageName";
process.startEvent("start1").message(m -> m.id("start-message").name(messageName)).endEvent();
process.startEvent("start2").message(messageName).endEvent();
return process.done();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ public static void addMessageProcessors(
.onCommand(
ValueType.MESSAGE,
MessageIntent.PUBLISH,
new PublishMessageProcessor(messageState, subscriptionState, subscriptionCommandSender))
new PublishMessageProcessor(
messageState,
subscriptionState,
startEventSubscriptionState,
subscriptionCommandSender))
.onCommand(
ValueType.MESSAGE, MessageIntent.DELETE, new DeleteMessageProcessor(messageState))
.onCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,24 @@
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.broker.subscription.command.SubscriptionCommandSender;
import io.zeebe.broker.subscription.message.state.Message;
import io.zeebe.broker.subscription.message.state.MessageStartEventSubscriptionState;
import io.zeebe.broker.subscription.message.state.MessageState;
import io.zeebe.broker.subscription.message.state.MessageSubscriptionState;
import io.zeebe.protocol.clientapi.RejectionType;
import io.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceRecord;
import io.zeebe.protocol.intent.MessageIntent;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import io.zeebe.util.sched.clock.ActorClock;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;
import org.agrona.collections.LongArrayList;

public class PublishMessageProcessor implements TypedRecordProcessor<MessageRecord> {

private final MessageState messageState;
private final MessageSubscriptionState subscriptionState;
private final MessageStartEventSubscriptionState startEventSubscriptionState;
private final SubscriptionCommandSender commandSender;

private TypedResponseWriter responseWriter;
Expand All @@ -50,9 +55,11 @@ public class PublishMessageProcessor implements TypedRecordProcessor<MessageReco
public PublishMessageProcessor(
final MessageState messageState,
final MessageSubscriptionState subscriptionState,
final MessageStartEventSubscriptionState startEventSubscriptionState,
final SubscriptionCommandSender commandSender) {
this.messageState = messageState;
this.subscriptionState = subscriptionState;
this.startEventSubscriptionState = startEventSubscriptionState;
this.commandSender = commandSender;
}

Expand Down Expand Up @@ -118,6 +125,8 @@ private void handleNewMessage(

sideEffect.accept(this::correlateMessage);

correlateMessageStartEvents(command, streamWriter);

if (messageRecord.getTimeToLive() > 0L) {
final Message message =
new Message(
Expand Down Expand Up @@ -160,4 +169,23 @@ private boolean correlateMessage() {

return responseWriter.flush();
}

private void correlateMessageStartEvents(
final TypedRecord<MessageRecord> command, final TypedStreamWriter streamWriter) {
final DirectBuffer messageName = command.getValue().getName();
startEventSubscriptionState.visitSubscriptionsByMessageName(
messageName,
subscription -> {
final DirectBuffer startEventId = subscription.getStartEventId();
final long workflowKey = subscription.getWorkflowKey();

final WorkflowInstanceRecord record = new WorkflowInstanceRecord();
record
.setWorkflowKey(workflowKey)
.setElementId(startEventId)
.setPayload(command.getValue().getPayload());

streamWriter.appendNewEvent(WorkflowInstanceIntent.EVENT_OCCURRED, record);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public enum BpmnStep {

// flow element container (process, sub process)
TRIGGER_START_EVENT,
CREATE_INSTANCE_ON_START_EVENT,
COMPLETE_PROCESS,
TERMINATE_CONTAINED_INSTANCES,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public void transform(StartEvent element, TransformContext context) {

private void bindLifecycle(
TransformContext context, final ExecutableCatchEventElement startEvent) {
startEvent.bindLifecycleState(
WorkflowInstanceIntent.EVENT_OCCURRED, BpmnStep.CREATE_INSTANCE_ON_START_EVENT);
startEvent.bindLifecycleState(WorkflowInstanceIntent.EVENT_TRIGGERING, BpmnStep.APPLY_EVENT);
startEvent.bindLifecycleState(
WorkflowInstanceIntent.EVENT_TRIGGERED, context.getCurrentFlowNodeOutgoingStep());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,30 @@ public BpmnStepGuards() {
final Predicate<BpmnStepContext<?>> availableFlowScopeGuard =
c -> c.getFlowScopeInstance() != null;

stepGuards.put(WorkflowInstanceIntent.ELEMENT_READY, noConcurrentTransitionGuard);
stepGuards.put(WorkflowInstanceIntent.ELEMENT_ACTIVATED, noConcurrentTransitionGuard);
stepGuards.put(WorkflowInstanceIntent.ELEMENT_COMPLETING, noConcurrentTransitionGuard);
final Predicate<BpmnStepContext<?>> isStartEvent =
c -> c.getRecord().getValue().getWorkflowInstanceKey() == -1;

stepGuards.put(
WorkflowInstanceIntent.ELEMENT_READY, noConcurrentTransitionGuard.and(hasElementInstances));
stepGuards.put(
WorkflowInstanceIntent.ELEMENT_ACTIVATED,
noConcurrentTransitionGuard.and(hasElementInstances));
stepGuards.put(
WorkflowInstanceIntent.ELEMENT_COMPLETING,
noConcurrentTransitionGuard.and(hasElementInstances));
stepGuards.put(WorkflowInstanceIntent.ELEMENT_COMPLETED, scopeActiveGuard);
stepGuards.put(WorkflowInstanceIntent.ELEMENT_TERMINATING, c -> true);
stepGuards.put(WorkflowInstanceIntent.ELEMENT_TERMINATED, availableFlowScopeGuard);
stepGuards.put(WorkflowInstanceIntent.ELEMENT_TERMINATING, hasElementInstances);
stepGuards.put(
WorkflowInstanceIntent.ELEMENT_TERMINATED,
availableFlowScopeGuard.and(hasElementInstances));

stepGuards.put(WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN, scopeActiveGuard);

stepGuards.put(WorkflowInstanceIntent.GATEWAY_ACTIVATED, scopeActiveGuard);

stepGuards.put(WorkflowInstanceIntent.EVENT_ACTIVATING, scopeActiveGuard);
stepGuards.put(WorkflowInstanceIntent.EVENT_ACTIVATED, scopeActiveGuard);
stepGuards.put(WorkflowInstanceIntent.EVENT_OCCURRED, scopeActiveGuard);
stepGuards.put(WorkflowInstanceIntent.EVENT_OCCURRED, isStartEvent.or(scopeActiveGuard));
stepGuards.put(WorkflowInstanceIntent.EVENT_TRIGGERING, scopeActiveGuard);
stepGuards.put(WorkflowInstanceIntent.EVENT_TRIGGERED, scopeActiveGuard);
}
Expand All @@ -67,6 +77,6 @@ public boolean shouldHandle(BpmnStepContext<?> context) {
throw new RuntimeException("no guard found for state: " + context.getState());
}

return hasElementInstances.test(context) && guard.test(context);
return guard.test(context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.zeebe.broker.workflow.processor.flownode.PropagateTerminationHandler;
import io.zeebe.broker.workflow.processor.gateway.ExclusiveSplitHandler;
import io.zeebe.broker.workflow.processor.gateway.ParallelSplitHandler;
import io.zeebe.broker.workflow.processor.instance.CreateWorkflowInstanceOnStartEventHandler;
import io.zeebe.broker.workflow.processor.process.CompleteProcessHandler;
import io.zeebe.broker.workflow.processor.sequenceflow.ParallelMergeHandler;
import io.zeebe.broker.workflow.processor.sequenceflow.StartFlowNodeHandler;
Expand All @@ -59,6 +60,11 @@ public BpmnStepHandlers(WorkflowState workflowState, ZeebeState zeebeState) {

// flow element container (process, sub process)
stepHandlers.put(BpmnStep.TRIGGER_START_EVENT, new TriggerStartEventHandler(workflowState));

stepHandlers.put(
BpmnStep.CREATE_INSTANCE_ON_START_EVENT,
new CreateWorkflowInstanceOnStartEventHandler(zeebeState));

stepHandlers.put(BpmnStep.COMPLETE_PROCESS, new CompleteProcessHandler());
stepHandlers.put(
BpmnStep.TERMINATE_CONTAINED_INSTANCES, new TerminateContainedElementsHandler(zeebeState));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Zeebe Broker Core
* Copyright © 2017 camunda services GmbH ([email protected])
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.zeebe.broker.workflow.processor.instance;

import io.zeebe.broker.incident.processor.TypedWorkflowInstanceRecord;
import io.zeebe.broker.logstreams.state.ZeebeState;
import io.zeebe.broker.workflow.model.element.ExecutableFlowElementContainer;
import io.zeebe.broker.workflow.processor.BpmnStepContext;
import io.zeebe.broker.workflow.processor.BpmnStepHandler;
import io.zeebe.broker.workflow.processor.EventOutput;
import io.zeebe.broker.workflow.state.DeployedWorkflow;
import io.zeebe.broker.workflow.state.IndexedRecord;
import io.zeebe.broker.workflow.state.WorkflowState;
import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceRecord;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import org.agrona.DirectBuffer;

public class CreateWorkflowInstanceOnStartEventHandler
implements BpmnStepHandler<ExecutableFlowElementContainer> {

private final ZeebeState state;
private final WorkflowState workflowState;

public CreateWorkflowInstanceOnStartEventHandler(ZeebeState zeebeState) {
this.state = zeebeState;
this.workflowState = zeebeState.getWorkflowState();
}

@Override
public void handle(BpmnStepContext<ExecutableFlowElementContainer> context) {

final WorkflowInstanceRecord eventRecord = context.getRecord().getValue();

final long workflowKey = eventRecord.getWorkflowKey();
final DeployedWorkflow workflowDefinition = workflowState.getWorkflowByKey(workflowKey);

if (workflowDefinition != null) {
final long workflowInstanceKey = state.getKeyGenerator().nextKey();
final DirectBuffer bpmnId = workflowDefinition.getWorkflow().getId();
final WorkflowInstanceRecord record = new WorkflowInstanceRecord();
record
.setBpmnProcessId(bpmnId)
.setWorkflowKey(workflowDefinition.getKey())
.setVersion(workflowDefinition.getVersion())
.setElementId(bpmnId)
.setWorkflowInstanceKey(workflowInstanceKey);

final EventOutput eventOutput = context.getOutput();
eventOutput.appendFollowUpEvent(
workflowInstanceKey, WorkflowInstanceIntent.ELEMENT_READY, record);

// Defer token which will be used by the start event
eventRecord.setWorkflowInstanceKey(workflowInstanceKey);
eventRecord.setScopeInstanceKey(workflowInstanceKey);

final IndexedRecord indexedRecord =
new IndexedRecord(
workflowInstanceKey, WorkflowInstanceIntent.EVENT_TRIGGERING, eventRecord);
final TypedWorkflowInstanceRecord deferredEvent = new TypedWorkflowInstanceRecord();
deferredEvent.wrap(indexedRecord);

eventOutput.deferEvent(deferredEvent);

} else {
// this should never happen because workflows are never deleted.
throw new IllegalStateException(
String.format(
"Expected to find deployed workflow with key %d, but no workflow found",
workflowKey));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,19 @@ public void handle(BpmnStepContext<ExecutableFlowElementContainer> context) {

if (element.getStartEvents().get(0).isNone()) {
// if none start event

value.setElementId(element.getStartEvents().get(0).getId());
} else {
// if timer start event
// if timer/message start event

final long wfInstanceKey = context.getRecord().getValue().getWorkflowInstanceKey();
final List<IndexedRecord> deferredTokens =
workflowState.getElementInstanceState().getDeferredTokens(wfInstanceKey);

if (deferredTokens.size() > 0) {

final IndexedRecord deferredToken = deferredTokens.get(0);
value.setElementId(deferredToken.getValue().getElementId());
final WorkflowInstanceRecord tokenValue = deferredToken.getValue();
value.setElementId(tokenValue.getElementId());
value.setPayload(tokenValue.getPayload());
workflowState
.getElementInstanceState()
.removeStoredRecord(wfInstanceKey, deferredToken.getKey(), Purpose.DEFERRED_TOKEN);
Expand Down
Loading

0 comments on commit 157e66e

Please sign in to comment.