Skip to content

Commit

Permalink
chore(broker-core): add variable state
Browse files Browse the repository at this point in the history
- easy navigation between scopes without deserializing element instances

- extension to element instance state
- exposes operations to set variables in the scope hierachy
  and collect them in a msgpack-encoded document

- variable state is always updated when a new element instance
  event is published
- all variables are set as local variables in the variable store
- we will remove this again, when we no longer use the current payload
  concept

- command contains names of variables to fetch
- if no variables are selected, all variables are fetched
- variables are retrieved from the new variables store
  • Loading branch information
ThorbenLindhauer authored and saig0 committed Jan 8, 2019
1 parent d55c695 commit 5585d9b
Show file tree
Hide file tree
Showing 26 changed files with 994 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,34 @@
import io.zeebe.broker.logstreams.processor.TypedRecordProcessor;
import io.zeebe.broker.logstreams.processor.TypedResponseWriter;
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.broker.workflow.state.WorkflowState;
import io.zeebe.msgpack.value.LongValue;
import io.zeebe.msgpack.value.StringValue;
import io.zeebe.msgpack.value.ValueArray;
import io.zeebe.protocol.clientapi.RejectionType;
import io.zeebe.protocol.impl.record.value.job.JobBatchRecord;
import io.zeebe.protocol.impl.record.value.job.JobRecord;
import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceRecord;
import io.zeebe.protocol.intent.JobBatchIntent;
import io.zeebe.protocol.intent.JobIntent;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.ObjectHashSet;
import org.agrona.concurrent.UnsafeBuffer;

public class JobBatchActivateProcessor implements TypedRecordProcessor<JobBatchRecord> {

private final JobState state;
private final WorkflowState workflowState;
private final ObjectHashSet<DirectBuffer> variableNames = new ObjectHashSet<>();

public JobBatchActivateProcessor(final JobState state) {
public JobBatchActivateProcessor(JobState state, WorkflowState workflowState) {
this.state = state;
this.workflowState = workflowState;
}

@Override
Expand Down Expand Up @@ -89,6 +100,17 @@ private void collectJobsToActivate(JobBatchRecord value, AtomicInteger amount) {
final ValueArray<LongValue> jobKeyIterator = value.jobKeys();

// collect jobs for activation

variableNames.clear();
final ValueArray<StringValue> variables = value.variables();

variables.forEach(
v -> {
final MutableDirectBuffer nameCopy = new UnsafeBuffer(new byte[v.getValue().capacity()]);
nameCopy.putBytes(0, v.getValue(), 0, v.getValue().capacity());
variableNames.add(nameCopy);
});

state.forEachActivatableJobs(
value.getType(),
(key, jobRecord) -> {
Expand All @@ -114,19 +136,48 @@ private void activateJobs(TypedStreamWriter streamWriter, JobBatchRecord value)
final Iterator<JobRecord> iterator = value.jobs().iterator();
final Iterator<LongValue> keyIt = value.jobKeys().iterator();
while (iterator.hasNext() && keyIt.hasNext()) {
final JobRecord next = iterator.next();
final JobRecord jobRecord = iterator.next();
final LongValue next1 = keyIt.next();
final long key = next1.getValue();

// update state and write follow up event for job record
final long elementInstanceKey = jobRecord.getHeaders().getElementInstanceKey();

if (elementInstanceKey >= 0) {
final DirectBuffer payload = collectPayload(variableNames, elementInstanceKey);
jobRecord.setPayload(payload);
} else {
jobRecord.setPayload(WorkflowInstanceRecord.EMPTY_PAYLOAD);
}

// we have to copy the job record because #write will reset the iterator state
final ExpandableArrayBuffer copy = new ExpandableArrayBuffer();
next.write(copy, 0);
final JobRecord jobRecord = new JobRecord();
jobRecord.wrap(copy, 0, next.getLength());
jobRecord.write(copy, 0);
final JobRecord copiedJob = new JobRecord();
copiedJob.wrap(copy, 0, jobRecord.getLength());

// update state and write follow up event for job record
state.activate(key, jobRecord);
streamWriter.appendFollowUpEvent(key, JobIntent.ACTIVATED, next);
state.activate(key, copiedJob);
streamWriter.appendFollowUpEvent(key, JobIntent.ACTIVATED, copiedJob);
}
}

private DirectBuffer collectPayload(
Collection<DirectBuffer> variableNames, long elementInstanceKey) {
final DirectBuffer payload;
if (variableNames.isEmpty()) {
payload =
workflowState
.getElementInstanceState()
.getVariablesState()
.getVariablesAsDocument(elementInstanceKey);
} else {
payload =
workflowState
.getElementInstanceState()
.getVariablesState()
.getVariablesAsDocument(elementInstanceKey, variableNames);
}
return payload;
}

private void rejectCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public static void addJobProcessors(
.onCommand(ValueType.JOB, JobIntent.UPDATE_RETRIES, new UpdateRetriesProcessor(jobState))
.onCommand(ValueType.JOB, JobIntent.CANCEL, new CancelProcessor(jobState))
.onCommand(
ValueType.JOB_BATCH, JobBatchIntent.ACTIVATE, new JobBatchActivateProcessor(jobState))
ValueType.JOB_BATCH,
JobBatchIntent.ACTIVATE,
new JobBatchActivateProcessor(jobState, workflowState))
.withListener(new JobTimeoutTrigger(jobState));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public enum ZbColumnFamilies {
TOKEN_EVENTS,
TOKEN_PARENT_CHILD,

// variable state
ELEMENT_INSTANCE_CHILD_PARENT,
VARIABLES,

// timer state
TIMERS,
TIMER_DUE_DATES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import org.agrona.ExpandableArrayBuffer;

public class ElementInstanceState {
Expand All @@ -59,6 +60,8 @@ public class ElementInstanceState {

private final ExpandableArrayBuffer copyBuffer = new ExpandableArrayBuffer();

private final VariablesState variablesState;

public ElementInstanceState(ZeebeDb<ZbColumnFamilies> zeebeDb) {

elementInstanceKey = new DbLong();
Expand All @@ -85,6 +88,8 @@ public ElementInstanceState(ZeebeDb<ZbColumnFamilies> zeebeDb) {
tokenParentChildColumnFamily =
zeebeDb.createColumnFamily(
ZbColumnFamilies.TOKEN_PARENT_CHILD, tokenParentStateTokenKey, DbNil.INSTANCE);

variablesState = new VariablesState(zeebeDb);
}

public ElementInstance newInstance(
Expand Down Expand Up @@ -115,6 +120,7 @@ private void writeElementInstance(ElementInstance instance) {

elementInstanceColumnFamily.put(elementInstanceKey, instance);
parentChildColumnFamily.put(parentChildKey, DbNil.INSTANCE);
variablesState.createScope(elementInstanceKey.getValue(), parentKey.getValue());
}

public ElementInstance getInstance(long key) {
Expand Down Expand Up @@ -148,6 +154,8 @@ public void removeInstance(long key) {
tokenColumnFamily.delete(compositeKey.getSecond());
});

variablesState.removeScope(key);

final long parentKey = instance.getParentKey();
if (parentKey > 0) {
final ElementInstance parentInstance = getInstance(parentKey);
Expand Down Expand Up @@ -271,6 +279,36 @@ private List<IndexedRecord> collectTokenEvents(long scopeKey, Purpose purpose) {
return records;
}

public boolean isEmpty() {
final AtomicBoolean isEmpty = new AtomicBoolean(true);

elementInstanceColumnFamily.whileTrue(
(k, v) -> {
isEmpty.compareAndSet(true, false);
return false;
});

parentChildColumnFamily.whileTrue(
(k, v) -> {
isEmpty.compareAndSet(true, false);
return false;
});

tokenColumnFamily.whileTrue(
(k, v) -> {
isEmpty.compareAndSet(true, false);
return false;
});

tokenParentChildColumnFamily.whileTrue(
(k, v) -> {
isEmpty.compareAndSet(true, false);
return false;
});

return isEmpty.get() && variablesState.isEmpty();
}

@FunctionalInterface
public interface TokenVisitor {
void visitToken(IndexedRecord indexedRecord);
Expand All @@ -295,6 +333,10 @@ private void visitTokens(long scopeKey, Purpose purpose, TokenVisitor visitor) {
});
}

public VariablesState getVariablesState() {
return variablesState;
}

public void flushDirtyState() {
for (Entry<Long, ElementInstance> entry : cachedInstances.entrySet()) {
updateInstance(entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Zeebe Broker Core
* Copyright © 2017 camunda services GmbH ([email protected])
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.zeebe.broker.workflow.state;

import io.zeebe.db.DbValue;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

public class Variable implements DbValue {

private final DirectBuffer value = new UnsafeBuffer(0, 0);

public void wrapValue(DirectBuffer buffer, int offset, int length) {
value.wrap(buffer, offset, length);
}

@Override
public void wrap(DirectBuffer buffer, int offset, int length) {
value.wrap(buffer);
}

@Override
public int getLength() {
return value.capacity();
}

@Override
public void write(MutableDirectBuffer buffer, int offset) {
buffer.putBytes(offset, value, 0, value.capacity());
}

public DirectBuffer getValue() {
return value;
}
}
Loading

0 comments on commit 5585d9b

Please sign in to comment.