Skip to content

Commit

Permalink
Enforcing num of concurrent runs quota given a Flow Executable (azkab…
Browse files Browse the repository at this point in the history
…an#1590)

A couple of production issues warned us that we should enforce the MAX number of concurrent executions given a flow executable. In case people accidentally schedule flows per minute or endlessly submit flows from the client side, this PR proposes to implement a simple quota to prevent it happening.

Unit Test is added.
  • Loading branch information
kunkun-tang authored Jan 10, 2018
1 parent 351b730 commit 2add240
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class ExecutorManager extends EventHandler implements

public static final String AZKABAN_USE_MULTIPLE_EXECUTORS =
"azkaban.use.multiple.executors";
public static final String AZKABAN_MAX_CONCURRENT_RUNS_ONEFLOW =
"azkaban.max.concurrent.runs.oneflow";
static final String AZKABAN_EXECUTOR_SELECTOR_FILTERS =
"azkaban.executorselector.filters";
static final String AZKABAN_EXECUTOR_SELECTOR_COMPARATOR_PREFIX =
Expand All @@ -79,6 +81,7 @@ public class ExecutorManager extends EventHandler implements
"azkaban.activeexecutor.refresh.milisecinterval";
private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_NUM_FLOW =
"azkaban.activeexecutor.refresh.flowinterval";
private static final int DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW = 30;
private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS =
"azkaban.executorinfo.refresh.maxThreads";
private static final String AZKABAN_MAX_DISPATCHING_ERRORS_PERMITTED =
Expand All @@ -98,6 +101,7 @@ public class ExecutorManager extends EventHandler implements
new ConcurrentHashMap<>();
private final ExecutingManagerUpdaterThread executingManager;
private final ExecutorApiGateway apiGateway;
private final int maxConcurrentRunsOneFlow;
QueuedExecutions queuedFlows;
File cacheDir;
private QueueProcessorThread queueProcessor;
Expand All @@ -124,6 +128,11 @@ public ExecutorManager(final Props azkProps, final ExecutorLoader loader,
this.loadRunningFlows();

this.queuedFlows = new QueuedExecutions(azkProps.getLong(AZKABAN_WEBSERVER_QUEUE_SIZE, 100000));

// The default threshold is set to 30 for now, in case some users are affected. We may
// decrease this number in future, to better prevent DDos attacks.
this.maxConcurrentRunsOneFlow = azkProps.getInt(AZKABAN_MAX_CONCURRENT_RUNS_ONEFLOW,
DEFAULT_MAX_ONCURRENT_RUNS_ONEFLOW);
this.loadQueuedFlows();

this.cacheDir = new File(azkProps.getString("cache.directory", "cache"));
Expand Down Expand Up @@ -951,6 +960,7 @@ public String submitExecutableFlow(final ExecutableFlow exflow, final String use
exflow.setSubmitUser(userId);
exflow.setSubmitTime(System.currentTimeMillis());

// Get collection of running flows given a project and a specific flow name
final List<Integer> running = getRunningFlows(projectId, flowId);

ExecutionOptions options = exflow.getExecutionOptions();
Expand All @@ -963,7 +973,11 @@ public String submitExecutableFlow(final ExecutableFlow exflow, final String use
}

if (!running.isEmpty()) {
if (options.getConcurrentOption().equals(
if (running.size() > this.maxConcurrentRunsOneFlow) {
throw new ExecutorManagerException("Flow " + flowId
+ " has more than " + this.maxConcurrentRunsOneFlow + " concurrent runs. Skipping",
ExecutorManagerException.Reason.SkippedExecution);
} else if (options.getConcurrentOption().equals(
ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
Collections.sort(running);
final Integer runningExecId = running.get(running.size() - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,30 @@ public void testSubmitFlows() throws Exception {
verify(this.loader).addActiveExecutableReference(any());
}

// Too many concurrent flows will fail job submission
@Test(expected = ExecutorManagerException.class)
public void testTooManySubmitFlows() throws Exception {
testSetUpForRunningFlows();
final ExecutableFlow flow1 = TestUtils
.createTestExecutableFlowFromYaml("basicyamlshelltest", "bashSleep");
flow1.setExecutionId(101);
final ExecutableFlow flow2 = TestUtils
.createTestExecutableFlowFromYaml("basicyamlshelltest", "bashSleep");
flow2.setExecutionId(102);
final ExecutableFlow flow3 = TestUtils
.createTestExecutableFlowFromYaml("basicyamlshelltest", "bashSleep");
flow3.setExecutionId(103);
final ExecutableFlow flow4 = TestUtils
.createTestExecutableFlowFromYaml("basicyamlshelltest", "bashSleep");
flow4.setExecutionId(104);
this.manager.submitExecutableFlow(flow1, this.user.getUserId());
verify(this.loader).uploadExecutableFlow(flow1);
this.manager.submitExecutableFlow(flow2, this.user.getUserId());
verify(this.loader).uploadExecutableFlow(flow2);
this.manager.submitExecutableFlow(flow3, this.user.getUserId());
this.manager.submitExecutableFlow(flow4, this.user.getUserId());
}

@Ignore
@Test
public void testFetchAllActiveFlows() throws Exception {
Expand Down Expand Up @@ -365,6 +389,9 @@ private void testSetUpForRunningFlows() throws Exception {
//so that flows will be dispatched to executors.
this.props.put(ExecutorManager.AZKABAN_QUEUEPROCESSING_ENABLED, "true");

// allow two concurrent runs give one Flow
this.props.put(ExecutorManager.AZKABAN_MAX_CONCURRENT_RUNS_ONEFLOW, 2);

final List<Executor> executors = new ArrayList<>();
final Executor executor1 = new Executor(1, "localhost", 12345, true);
final Executor executor2 = new Executor(2, "localhost", 12346, true);
Expand Down
15 changes: 15 additions & 0 deletions azkaban-common/src/test/java/azkaban/utils/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import azkaban.executor.ExecutableFlow;
import azkaban.flow.Flow;
import azkaban.project.DirectoryYamlFlowLoader;
import azkaban.project.Project;
import azkaban.test.executions.ExecutionsTestUtil;
import azkaban.user.User;
Expand Down Expand Up @@ -53,6 +54,20 @@ public static ExecutableFlow createTestExecutableFlow(final String projectName,
return execFlow;
}

/* Helper method to create an ExecutableFlow from Yaml */
public static ExecutableFlow createTestExecutableFlowFromYaml(final String projectName,
final String flowName) throws IOException {

final Project project = new Project(11, projectName);
final DirectoryYamlFlowLoader loader = new DirectoryYamlFlowLoader(new Props());
loader.loadProjectFlow(project, ExecutionsTestUtil.getFlowDir(projectName));
project.setFlows(loader.getFlowMap());
project.setVersion(123);

final Flow flow = project.getFlow(flowName);
return new ExecutableFlow(project, flow);
}

/* Helper method to create an XmlUserManager from XML_FILE_PARAM file */
public static UserManager createTestXmlUserManager() {
final Props props = new Props();
Expand Down
20 changes: 20 additions & 0 deletions test/execution-test-data/basicyamlshelltest/bashSleep.flow
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
config:
flow-level-parameter: value

nodes:
- name: shell_end
type: noop
dependsOn:
- shell_sleep
- shell_echo

- name: shell_echo
type: command
config:
command: echo "This is an echoed text."

- name: shell_sleep
type: command
config:
command: sleep 5
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
azkaban-flow-version: 2.0

0 comments on commit 2add240

Please sign in to comment.