Skip to content

Commit

Permalink
fix(broker-core): event-based gateways can't be triggered twice
Browse files Browse the repository at this point in the history
  • Loading branch information
Miguel Pires committed Jan 10, 2019
1 parent b5c5435 commit e5c3836
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ public boolean occurEventForElement(
final WorkflowInstanceRecord deferredRecord = tokenEvent.getRecord().getValue();
deferredRecord.setPayload(eventPayload).setElementId(eventHandlerId);

elementInstanceState.removeStoredRecord(
deferredRecord.getScopeInstanceKey(), elementInstanceKey, Purpose.DEFERRED_TOKEN);

streamWriter.appendFollowUpEvent(
elementInstanceKey, WorkflowInstanceIntent.EVENT_OCCURRED, deferredRecord);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Zeebe Broker Core
* Copyright © 2017 camunda services GmbH ([email protected])
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.zeebe.broker.workflow.gateway;

import static io.zeebe.test.util.MsgPackUtil.asMsgPack;
import static io.zeebe.util.buffer.BufferUtil.wrapString;
import static org.assertj.core.api.Assertions.assertThat;

import io.zeebe.broker.logstreams.processor.TypedRecord;
import io.zeebe.broker.subscription.message.data.WorkflowInstanceSubscriptionRecord;
import io.zeebe.broker.util.StreamProcessorControl;
import io.zeebe.broker.util.StreamProcessorRule;
import io.zeebe.broker.workflow.processor.WorkflowInstanceStreamProcessorRule;
import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.model.bpmn.BpmnModelInstance;
import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceRecord;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import io.zeebe.protocol.intent.WorkflowInstanceSubscriptionIntent;
import io.zeebe.test.util.TestUtil;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

public class EventbasedGatewayStreamProcessorTest {
public static final String PROCESS_ID = "process";

public StreamProcessorRule envRule = new StreamProcessorRule();
public WorkflowInstanceStreamProcessorRule streamProcessorRule =
new WorkflowInstanceStreamProcessorRule(envRule);

@Rule public RuleChain chain = RuleChain.outerRule(envRule).around(streamProcessorRule);

private StreamProcessorControl streamProcessor;

public static final BpmnModelInstance MESSAGE_WORKFLOW =
Bpmn.createExecutableProcess(PROCESS_ID)
.startEvent("start")
.eventBasedGateway()
.id("gateway")
.intermediateCatchEvent(
"message-1", c -> c.message(m -> m.name("msg-1").zeebeCorrelationKey("$.key")))
.sequenceFlowId("to-end1")
.endEvent("end1")
.moveToLastGateway()
.intermediateCatchEvent(
"message-2", c -> c.message(m -> m.name("msg-2").zeebeCorrelationKey("$.key")))
.sequenceFlowId("to-end2")
.endEvent("end2")
.done();

@Before
public void setUp() {
streamProcessor = streamProcessorRule.getStreamProcessor();
}

@Test
public void shouldOnlyExecuteOneBranchWithSimultaneousMessages() {
// given
streamProcessorRule.deploy(MESSAGE_WORKFLOW);
streamProcessor.blockAfterWorkflowInstanceRecord(
m -> m.getMetadata().getIntent() == WorkflowInstanceIntent.GATEWAY_ACTIVATED);
final TypedRecord<WorkflowInstanceRecord> instance =
streamProcessorRule.createWorkflowInstance(PROCESS_ID, asMsgPack("key", "123"));

TestUtil.waitUntil(() -> streamProcessor.isBlocked());

final long gatewayElementKey =
envRule.events().withIntent(WorkflowInstanceIntent.GATEWAY_ACTIVATED).getFirst().getKey();

final WorkflowInstanceSubscriptionRecord subscriptionRecord =
new WorkflowInstanceSubscriptionRecord();
subscriptionRecord
.setSubscriptionPartitionId(1)
.setPayload(asMsgPack("key", "123"))
.setElementInstanceKey(gatewayElementKey)
.setWorkflowInstanceKey(instance.getValue().getWorkflowInstanceKey())
.setMessageName(new UnsafeBuffer("msg-1".getBytes()));
envRule.writeCommand(WorkflowInstanceSubscriptionIntent.CORRELATE, subscriptionRecord);

final WorkflowInstanceSubscriptionRecord secondSubscriptionRecord =
new WorkflowInstanceSubscriptionRecord();
secondSubscriptionRecord
.setPayload(asMsgPack("key", "123"))
.setSubscriptionPartitionId(1)
.setElementInstanceKey(gatewayElementKey)
.setWorkflowInstanceKey(instance.getValue().getWorkflowInstanceKey())
.setMessageName(new UnsafeBuffer("msg-2".getBytes()));
envRule.writeCommand(WorkflowInstanceSubscriptionIntent.CORRELATE, secondSubscriptionRecord);

streamProcessor.unblock();

// then
TestUtil.waitUntil(
() ->
envRule
.events()
.onlyWorkflowInstanceRecords()
.withIntent(WorkflowInstanceIntent.ELEMENT_COMPLETED)
.filter(r -> r.getValue().getElementId().equals(wrapString(PROCESS_ID)))
.exists());

assertThat(
envRule
.events()
.onlyWorkflowInstanceRecords()
.withIntent(WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN)
.filter(r -> r.getValue().getElementId().equals(wrapString("to-end1")))
.exists()
^ envRule
.events()
.onlyWorkflowInstanceRecords()
.withIntent(WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN)
.filter(r -> r.getValue().getElementId().equals(wrapString("to-end2")))
.exists())
.isTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,20 @@ public class EventbasedGatewayTest {
.endEvent("end2")
.done();

private static final BpmnModelInstance WORKFLOW_WITH_EQUAL_TIMERS =
Bpmn.createExecutableProcess(PROCESS_ID)
.startEvent("start")
.eventBasedGateway()
.id("gateway")
.intermediateCatchEvent("timer-1", c -> c.timerWithDuration("PT2S"))
.sequenceFlowId("to-end1")
.endEvent("end1")
.moveToLastGateway()
.intermediateCatchEvent("timer-2", c -> c.timerWithDuration("PT2S"))
.sequenceFlowId("to-end2")
.endEvent("end2")
.done();

private static final BpmnModelInstance WORKFLOW_WITH_MESSAGES =
Bpmn.createExecutableProcess(PROCESS_ID)
.startEvent("start")
Expand Down Expand Up @@ -215,6 +229,33 @@ public void shouldContinueWhenTimerIsTriggered() {
.exists());
}

@Test
public void shouldOnlyExecuteOneBranchWithEqualTimers() {
// given
testClient.deploy(WORKFLOW_WITH_EQUAL_TIMERS);
testClient.createWorkflowInstance(PROCESS_ID);
assertThat(RecordingExporter.timerRecords(TimerIntent.CREATED).limit(2).exists()).isTrue();
// when
brokerRule.getClock().addTime(Duration.ofSeconds(2));

// then
assertThat(
RecordingExporter.workflowInstanceRecords(WorkflowInstanceIntent.GATEWAY_ACTIVATED)
.limit(1)
.count())
.isEqualTo(1);

assertThat(
RecordingExporter.workflowInstanceRecords(WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN)
.withElementId("to-end2")
.exists()
^ RecordingExporter.workflowInstanceRecords(
WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN)
.withElementId("to-end1")
.exists())
.isTrue();
}

@Test
public void shouldContinueWhenMessageIsCorrelated() {
// given
Expand Down

0 comments on commit e5c3836

Please sign in to comment.