Skip to content

Commit

Permalink
KAFKA-7514: Add threads to ConsumeBenchWorker (apache#5864)
Browse files Browse the repository at this point in the history
Add threads with separate consumers to ConsumeBenchWorker.  Update the Trogdor test scripts and documentation with the new functionality.

Reviewers: Colin McCabe <[email protected]>
  • Loading branch information
stanislavkozlovski authored and cmccabe committed Nov 13, 2018
1 parent d00938f commit 8259fda
Show file tree
Hide file tree
Showing 8 changed files with 376 additions and 88 deletions.
44 changes: 24 additions & 20 deletions TROGDOR.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,23 @@ Let's confirm that all of the daemons are running:
Now, we can submit a test job to Trogdor. Here's an example of a short bash script which makes it easier.

> ./tests/bin/trogdor-run-produce-bench.sh
[2018-04-12 10:32:04,055] DEBUG Sending POST with input {"id":"produce_bench_22137","spec":{"class":"org.apache.kafka.trogdor.workload.ProduceBenchSpec","startMs":0,"durationMs":10000000,"producerNode":"node0","bootstrapServers":"localhost:9092","targetMessagesPerSec":10,"maxMessages":100,"keyGenerator":{"type":"sequential","size":4,"startOffset":0},"valueGenerator":{"type":"constant","size":512,"value":"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="},"totalTopics":10,"activeTopics":5,"topicPrefix":"foo","replicationFactor":1,"classLoader":{},"numPartitions":1}} to http://localhost:8889/coordinator/task/create (org.apache.kafka.trogdor.coordinator.CoordinatorClient)
Created task.
$TASK_ID = produce_bench_20462
Sent CreateTaskRequest for task produce_bench_21634.$TASK_ID = produce_bench_21634

To get the test results, we run --show-tasks:

./bin/trogdor.sh client --show-tasks localhost:8889
Got coordinator tasks: {
"tasks" : {
"produce_bench_20462" : {
"produce_bench_21634" : {
"state" : "DONE",
"spec" : {
"class" : "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
"startMs" : 0,
"durationMs" : 10000000,
"producerNode" : "node0",
"bootstrapServers" : "localhost:9092",
"targetMessagesPerSec" : 10,
"maxMessages" : 100,
"targetMessagesPerSec" : 10000,
"maxMessages" : 50000,
"keyGenerator" : {
"type" : "sequential",
"size" : 4,
Expand All @@ -67,22 +65,28 @@ To get the test results, we run --show-tasks:
"size" : 512,
"value" : "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="
},
"totalTopics" : 10,
"activeTopics" : 5,
"topicPrefix" : "foo",
"replicationFactor" : 1,
"classLoader" : { },
"numPartitions" : 1
"activeTopics" : {
"foo[1-3]" : {
"numPartitions" : 10,
"replicationFactor" : 1
}
},
"inactiveTopics" : {
"foo[4-5]" : {
"numPartitions" : 10,
"replicationFactor" : 1
}
}
},
"startedMs" : 1523552769850,
"doneMs" : 1523552780878,
"startedMs" : 1541435949784,
"doneMs" : 1541435955803,
"cancelled" : false,
"status" : {
"totalSent" : 500,
"averageLatencyMs" : 4.972,
"p50LatencyMs" : 4,
"p95LatencyMs" : 6,
"p99LatencyMs" : 12
"totalSent" : 50000,
"averageLatencyMs" : 11.0293,
"p50LatencyMs" : 9,
"p95LatencyMs" : 27,
"p99LatencyMs" : 39
}
}
}
Expand Down Expand Up @@ -141,7 +145,7 @@ ProduceBench starts a Kafka producer on a single agent node, producing to severa
RoundTripWorkload tests both production and consumption. The workload starts a Kafka producer and consumer on a single node. The consumer will read back the messages that were produced by the producer.

### ConsumeBench
ConsumeBench starts a Kafka consumer on a single agent node. Depending on the passed in configuration (see ConsumeBenchSpec), the consumer either subscribes to a set of topics (leveraging consumer group functionality) or manually assigns partitions to itself.
ConsumeBench starts one or more Kafka consumers on a single agent node. Depending on the passed in configuration (see ConsumeBenchSpec), the consumers either subscribe to a set of topics (leveraging consumer group functionality and dynamic partition assignment) or manually assign partitions to themselves.
The workload measures the average produce latency, as well as the median, 95th percentile, and 99th percentile latency.

Faults
Expand Down
5 changes: 4 additions & 1 deletion tests/bin/trogdor-run-consume-bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ cat <<EOF
"durationMs": 10000000,
"consumerNode": "node0",
"bootstrapServers": "localhost:9092",
"maxMessages": 100,
"targetMessagesPerSec": 1000,
"threadsPerWorker": 5,
"consumerGroup": "cg",
"maxMessages": 10000,
"activeTopics": ["foo[1-3]"]
}
}
Expand Down
8 changes: 4 additions & 4 deletions tests/bin/trogdor-run-produce-bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ cat <<EOF
"durationMs": 10000000,
"producerNode": "node0",
"bootstrapServers": "localhost:9092",
"targetMessagesPerSec": 10,
"maxMessages": 100,
"targetMessagesPerSec": 10000,
"maxMessages": 50000,
"activeTopics": {
"foo[1-3]": {
"numPartitions": 3,
"numPartitions": 10,
"replicationFactor": 1
}
},
"inactiveTopics": {
"foo[4-5]": {
"numPartitions": 3,
"numPartitions": 10,
"replicationFactor": 1
}
}
Expand Down
3 changes: 2 additions & 1 deletion tests/kafkatest/services/trogdor/consume_bench_workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
class ConsumeBenchWorkloadSpec(TaskSpec):
def __init__(self, start_ms, duration_ms, consumer_node, bootstrap_servers,
target_messages_per_sec, max_messages, active_topics,
consumer_conf, common_client_conf, admin_client_conf, consumer_group=None):
consumer_conf, common_client_conf, admin_client_conf, consumer_group=None, threads_per_worker=1):
super(ConsumeBenchWorkloadSpec, self).__init__(start_ms, duration_ms)
self.message["class"] = "org.apache.kafka.trogdor.workload.ConsumeBenchSpec"
self.message["consumerNode"] = consumer_node
Expand All @@ -32,6 +32,7 @@ def __init__(self, start_ms, duration_ms, consumer_node, bootstrap_servers,
self.message["adminClientConf"] = admin_client_conf
self.message["commonClientConf"] = common_client_conf
self.message["activeTopics"] = active_topics
self.message["threadsPerWorker"] = threads_per_worker
if consumer_group is not None:
self.message["consumerGroup"] = consumer_group

Expand Down
90 changes: 81 additions & 9 deletions tests/kafkatest/tests/core/consume_bench_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def test_consume_bench(self, topics):
tasks = self.trogdor.tasks()
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))

def test_consume_bench_single_partition(self):
def test_single_partition(self):
"""
Run a ConsumeBench against a single partition
"""
Expand All @@ -107,9 +107,32 @@ def test_consume_bench_single_partition(self):
tasks = self.trogdor.tasks()
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))

def test_consume_group_bench(self):
def test_multiple_consumers_random_group_topics(self):
"""
Runs two ConsumeBench workloads in the same consumer group to read messages from topics
Runs multiple consumers group to read messages from topics.
Since a consumerGroup isn't specified, each consumer should read from all topics independently
"""
self.produce_messages(self.active_topics, max_messages=5000)
consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
self.consumer_workload_service.consumer_node,
self.consumer_workload_service.bootstrap_servers,
target_messages_per_sec=1000,
max_messages=5000, # all should read exactly 5k messages
consumer_conf={},
admin_client_conf={},
common_client_conf={},
threads_per_worker=5,
active_topics=["consume_bench_topic[0-5]"])
consume_workload = self.trogdor.create_task("consume_workload", consume_spec)
consume_workload.wait_for_done(timeout_sec=360)
self.logger.debug("Consume workload finished")
tasks = self.trogdor.tasks()
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))

def test_two_consumers_specified_group_topics(self):
"""
Runs two consumers in the same consumer group to read messages from topics.
Since a consumerGroup is specified, each consumer should dynamically get assigned a partition from group
"""
self.produce_messages(self.active_topics)
consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
Expand All @@ -120,13 +143,62 @@ def test_consume_group_bench(self):
consumer_conf={},
admin_client_conf={},
common_client_conf={},
threads_per_worker=2,
consumer_group="testGroup",
active_topics=["consume_bench_topic[0-5]"])
consume_workload_1 = self.trogdor.create_task("consume_workload_1", consume_spec)
consume_workload_2 = self.trogdor.create_task("consume_workload_2", consume_spec)
consume_workload_1.wait_for_done(timeout_sec=360)
self.logger.debug("Consume workload 1 finished")
consume_workload_2.wait_for_done(timeout_sec=360)
self.logger.debug("Consume workload 2 finished")
consume_workload = self.trogdor.create_task("consume_workload", consume_spec)
consume_workload.wait_for_done(timeout_sec=360)
self.logger.debug("Consume workload finished")
tasks = self.trogdor.tasks()
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))

def test_multiple_consumers_random_group_partitions(self):
"""
Runs multiple consumers in to read messages from specific partitions.
Since a consumerGroup isn't specified, each consumer will get assigned a random group
and consume from all partitions
"""
self.produce_messages(self.active_topics, max_messages=20000)
consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
self.consumer_workload_service.consumer_node,
self.consumer_workload_service.bootstrap_servers,
target_messages_per_sec=1000,
max_messages=2000,
consumer_conf={},
admin_client_conf={},
common_client_conf={},
threads_per_worker=4,
active_topics=["consume_bench_topic1:[0-4]"])
consume_workload = self.trogdor.create_task("consume_workload", consume_spec)
consume_workload.wait_for_done(timeout_sec=360)
self.logger.debug("Consume workload finished")
tasks = self.trogdor.tasks()
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))

def test_multiple_consumers_specified_group_partitions_should_raise(self):
"""
Runs multiple consumers in to read messages from specific partitions.
Since a consumerGroup isn't specified, each consumer will get assigned a random group
and consume from all partitions
"""
self.produce_messages(self.active_topics, max_messages=20000)
consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
self.consumer_workload_service.consumer_node,
self.consumer_workload_service.bootstrap_servers,
target_messages_per_sec=1000,
max_messages=2000,
consumer_conf={},
admin_client_conf={},
common_client_conf={},
threads_per_worker=4,
consumer_group="fail_group",
active_topics=["consume_bench_topic1:[0-4]"])
consume_workload = self.trogdor.create_task("consume_workload", consume_spec)
try:
consume_workload.wait_for_done(timeout_sec=360)
raise Exception("Should have raised an exception due to an invalid configuration")
except RuntimeError as e:
if 'Will not split partitions' not in str(e):
raise RuntimeError("Unexpected Exception - " + str(e))
self.logger.info(e)

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.trogdor.common.StringExpander;
import org.apache.kafka.trogdor.task.TaskController;
import org.apache.kafka.trogdor.task.TaskSpec;
Expand Down Expand Up @@ -61,6 +62,15 @@
* #{@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection)}.
* It will be assigned partitions dynamically from the consumer group.
*
* This specification supports the spawning of multiple consumers in the single Trogdor worker agent.
* The "threadsPerWorker" field denotes how many consumers should be spawned for this spec.
* It is worth noting that the "targetMessagesPerSec", "maxMessages" and "activeTopics" fields apply for every consumer individually.
*
* If a consumer group is not specified, every consumer is assigned a different, random group. When specified, all consumers use the same group.
* Since no two consumers in the same group can be assigned the same partition,
* explicitly specifying partitions in "activeTopics" when there are multiple "threadsPerWorker"
* and a particular "consumerGroup" will result in an #{@link ConfigException}, aborting the task.
*
* An example JSON representation which will result in a consumer that is part of the consumer group "cg" and
* subscribed to topics foo1, foo2, foo3 and bar.
* #{@code
Expand All @@ -77,7 +87,6 @@
*/
public class ConsumeBenchSpec extends TaskSpec {

static final String EMPTY_CONSUMER_GROUP = "";
private static final String VALID_EXPANDED_TOPIC_NAME_PATTERN = "^[^:]+(:[\\d]+|[^:]*)$";
private final String consumerNode;
private final String bootstrapServers;
Expand All @@ -88,6 +97,7 @@ public class ConsumeBenchSpec extends TaskSpec {
private final Map<String, String> commonClientConf;
private final List<String> activeTopics;
private final String consumerGroup;
private final int threadsPerWorker;

@JsonCreator
public ConsumeBenchSpec(@JsonProperty("startMs") long startMs,
Expand All @@ -100,6 +110,7 @@ public ConsumeBenchSpec(@JsonProperty("startMs") long startMs,
@JsonProperty("consumerConf") Map<String, String> consumerConf,
@JsonProperty("commonClientConf") Map<String, String> commonClientConf,
@JsonProperty("adminClientConf") Map<String, String> adminClientConf,
@JsonProperty("threadsPerWorker") Integer threadsPerWorker,
@JsonProperty("activeTopics") List<String> activeTopics) {
super(startMs, durationMs);
this.consumerNode = (consumerNode == null) ? "" : consumerNode;
Expand All @@ -110,7 +121,8 @@ public ConsumeBenchSpec(@JsonProperty("startMs") long startMs,
this.commonClientConf = configOrEmptyMap(commonClientConf);
this.adminClientConf = configOrEmptyMap(adminClientConf);
this.activeTopics = activeTopics == null ? new ArrayList<>() : activeTopics;
this.consumerGroup = consumerGroup == null ? EMPTY_CONSUMER_GROUP : consumerGroup;
this.consumerGroup = consumerGroup == null ? "" : consumerGroup;
this.threadsPerWorker = threadsPerWorker == null ? 1 : threadsPerWorker;
}

@JsonProperty
Expand Down Expand Up @@ -138,6 +150,11 @@ public int maxMessages() {
return maxMessages;
}

@JsonProperty
public int threadsPerWorker() {
return threadsPerWorker;
}

@JsonProperty
public Map<String, String> consumerConf() {
return consumerConf;
Expand Down
Loading

0 comments on commit 8259fda

Please sign in to comment.