Skip to content

Commit

Permalink
KAFKA-5281; System tests for transactions
Browse files Browse the repository at this point in the history
Author: Apurva Mehta <[email protected]>

Reviewers: Jason Gustafson <[email protected]>

Closes apache#3149 from apurvam/KAFKA-5281-transactions-system-tests
  • Loading branch information
Apurva Mehta authored and hachikuji committed Jun 1, 2017
1 parent 8e8b3c5 commit 1959835
Show file tree
Hide file tree
Showing 10 changed files with 706 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,9 @@ public void handleResponse(AbstractResponse response) {
Errors error = entry.getValue();
if (error == Errors.NONE) {
pendingTxnOffsetCommits.remove(topicPartition);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR
|| error == Errors.REQUEST_TIMED_OUT) {
hadFailure = true;
if (!coordinatorReloaded) {
coordinatorReloaded = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class OffsetCommitResponse extends AbstractResponse {
* Possible error codes:
*
* UNKNOWN_TOPIC_OR_PARTITION (3)
* REQUEST_TIMED_OUT (7)
* OFFSET_METADATA_TOO_LARGE (12)
* COORDINATOR_LOAD_IN_PROGRESS (14)
* GROUP_COORDINATOR_NOT_AVAILABLE (15)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
// GroupAuthorizationFailed
// InvalidCommitOffsetSize
// TransactionalIdAuthorizationFailed
// RequestTimedOut

private final Map<TopicPartition, Errors> errors;
private final int throttleTimeMs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
case Errors.UNKNOWN_TOPIC_OR_PARTITION |
Errors.NOT_LEADER_FOR_PARTITION |
Errors.NOT_ENOUGH_REPLICAS |
Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND => // these are retriable errors
Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND |
Errors.REQUEST_TIMED_OUT => // these are retriable errors

info(s"Sending $transactionalId's transaction marker for partition $topicPartition has failed with error ${error.exceptionName}, retrying " +
s"with current coordinator epoch ${epochAndMetadata.coordinatorEpoch}")
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/scala/kafka/tools/ConsoleConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ object ConsoleConsumer extends Logging {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")

props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.isolationLevel)
props
}

Expand Down Expand Up @@ -264,7 +264,7 @@ object ConsoleConsumer extends Logging {
"skip it instead of halt.")
val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
"set, the csv metrics will be outputed here")
"set, the csv metrics will be output here")
.withRequiredArg
.describedAs("metrics directory")
.ofType(classOf[java.lang.String])
Expand All @@ -284,6 +284,13 @@ object ConsoleConsumer extends Logging {
val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events",
"Log lifecycle events of the consumer in addition to logging consumed " +
"messages. (This is specific for system tests.)")
val isolationLevelOpt = parser.accepts("isolation-level",
"Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommitted" +
"to read all messages.")
.withRequiredArg()
.ofType(classOf[String])
.defaultsTo("read_uncommitted")


if (args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.")
Expand Down Expand Up @@ -314,6 +321,7 @@ object ConsoleConsumer extends Logging {
val bootstrapServer = options.valueOf(bootstrapServerOpt)
val keyDeserializer = options.valueOf(keyDeserializerOpt)
val valueDeserializer = options.valueOf(valueDeserializerOpt)
val isolationLevel = options.valueOf(isolationLevelOpt).toString
val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]

if (keyDeserializer != null && !keyDeserializer.isEmpty) {
Expand Down
9 changes: 7 additions & 2 deletions tests/kafkatest/services/console_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0

"""
0.8.2.1 ConsoleConsumer options
Expand Down Expand Up @@ -97,7 +97,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-group", new_consumer=True,
message_validator=None, from_beginning=True, consumer_timeout_ms=None, version=DEV_BRANCH,
client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=None,
enable_systest_events=False, stop_timeout_sec=15, print_timestamp=False):
enable_systest_events=False, stop_timeout_sec=15, print_timestamp=False,
isolation_level="read_uncommitted"):
"""
Args:
context: standard context
Expand All @@ -117,6 +118,7 @@ def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-gro
stop_timeout_sec After stopping a node, wait up to stop_timeout_sec for the node to stop,
and the corresponding background thread to finish successfully.
print_timestamp if True, print each message's timestamp as well
isolation_level How to handle transactional messages.
"""
JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or [])
BackgroundThreadService.__init__(self, context, num_nodes)
Expand All @@ -140,6 +142,7 @@ def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-gro
self.log_level = "TRACE"
self.stop_timeout_sec = stop_timeout_sec

self.isolation_level = isolation_level
self.enable_systest_events = enable_systest_events
if self.enable_systest_events:
# Only available in 0.10.0 and up
Expand Down Expand Up @@ -190,6 +193,8 @@ def start_cmd(self, node):
if node.version <= LATEST_0_10_0:
cmd += " --new-consumer"
cmd += " --bootstrap-server %(broker_list)s" % args
if node.version >= V_0_11_0_0:
cmd += " --isolation-level %s" % self.isolation_level
else:
cmd += " --zookeeper %(zk_connect)s" % args
if self.from_beginning:
Expand Down
183 changes: 183 additions & 0 deletions tests/kafkatest/services/transactional_message_copier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import json
import signal

from ducktape.utils.util import wait_until
from ducktape.services.background_thread import BackgroundThreadService
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from ducktape.cluster.remoteaccount import RemoteCommandError

class TransactionalMessageCopier(KafkaPathResolverMixin, BackgroundThreadService):
"""This service wraps org.apache.kafka.tools.TransactionalMessageCopier for
use in system testing.
"""
PERSISTENT_ROOT = "/mnt/transactional_message_copier"
STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "transactional_message_copier.stdout")
STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "transactional_message_copier.stderr")
LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
LOG_FILE = os.path.join(LOG_DIR, "transactional_message_copier.log")
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")

logs = {
"transactional_message_copier_stdout": {
"path": STDOUT_CAPTURE,
"collect_default": True},
"transactional_message_copier_stderr": {
"path": STDERR_CAPTURE,
"collect_default": True},
"transactional_message_copier_log": {
"path": LOG_FILE,
"collect_default": True}
}

def __init__(self, context, num_nodes, kafka, transactional_id, consumer_group,
input_topic, input_partition, output_topic, max_messages = -1,
transaction_size = 1000, log_level="INFO"):
super(TransactionalMessageCopier, self).__init__(context, num_nodes)
self.log_level = log_level
self.kafka = kafka
self.transactional_id = transactional_id
self.consumer_group = consumer_group
self.transaction_size = transaction_size
self.input_topic = input_topic
self.input_partition = input_partition
self.output_topic = output_topic
self.max_messages = max_messages
self.message_copy_finished = False
self.consumed = -1
self.remaining = -1
self.stop_timeout_sec = 60

def _worker(self, idx, node):
node.account.ssh("mkdir -p %s" % TransactionalMessageCopier.PERSISTENT_ROOT,
allow_fail=False)
# Create and upload log properties
log_config = self.render('tools_log4j.properties',
log_file=TransactionalMessageCopier.LOG_FILE)
node.account.create_file(TransactionalMessageCopier.LOG4J_CONFIG, log_config)
# Configure security
self.security_config = self.kafka.security_config.client_config(node=node)
self.security_config.setup_node(node)
cmd = self.start_cmd(node, idx)
self.logger.debug("TransactionalMessageCopier %d command: %s" % (idx, cmd))
try:
for line in node.account.ssh_capture(cmd):
line = line.strip()
data = self.try_parse_json(line)
if data is not None:
with self.lock:
self.remaining = int(data["remaining"])
self.consumed = int(data["consumed"])
self.logger.info("%s: consumed %d, remaining %d" %
(self.transactional_id, self.consumed, self.remaining))
if "shutdown_complete" in data:
if self.remaining == 0:
# We are only finished if the remaining
# messages at the time of shutdown is 0.
#
# Otherwise a clean shutdown would still print
# a 'shutdown complete' messages even though
# there are unprocessed messages, causing
# tests to fail.
self.logger.info("%s : Finished message copy" % self.transactional_id)
self.message_copy_finished = True
else:
self.logger.info("%s : Shut down without finishing message copy." %\
self.transactional_id)
except RemoteCommandError as e:
self.logger.debug("Got exception while reading output from copier, \
probably because it was SIGKILL'd (exit code 137): %s" % str(e))

def start_cmd(self, node, idx):
cmd = "export LOG_DIR=%s;" % TransactionalMessageCopier.LOG_DIR
cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % TransactionalMessageCopier.LOG4J_CONFIG
cmd += self.path.script("kafka-run-class.sh", node) + " org.apache.kafka.tools." + "TransactionalMessageCopier"
cmd += " --broker-list %s" % self.kafka.bootstrap_servers(self.security_config.security_protocol)
cmd += " --transactional-id %s" % self.transactional_id
cmd += " --consumer-group %s" % self.consumer_group
cmd += " --input-topic %s" % self.input_topic
cmd += " --output-topic %s" % self.output_topic
cmd += " --input-partition %s" % str(self.input_partition)
cmd += " --transaction-size %s" % str(self.transaction_size)
if self.max_messages > 0:
cmd += " --max-messages %s" % str(self.max_messages)
cmd += " 2>> %s | tee -a %s &" % (TransactionalMessageCopier.STDERR_CAPTURE, TransactionalMessageCopier.STDOUT_CAPTURE)

return cmd

def clean_node(self, node, clean_shutdown=True):
self.kill_node(node, clean_shutdown)
node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
self.security_config.clean_node(node)

def pids(self, node):
try:
cmd = "ps ax | grep -i TransactionalMessageCopier | grep java | grep -v grep | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
except (RemoteCommandError, ValueError) as e:
self.logger.error("Could not list pids: %s" % str(e))
return []

def alive(self, node):
return len(self.pids(node)) > 0

def kill_node(self, node, clean_shutdown=True):
pids = self.pids(node)
sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
for pid in pids:
node.account.signal(pid, sig)
wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=60, err_msg="Message Copier failed to stop")

def stop_node(self, node, clean_shutdown=True):
self.kill_node(node, clean_shutdown)
stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \
(str(node.account), str(self.stop_timeout_sec))

def restart(self, clean_shutdown):
if self.is_done:
return
node = self.nodes[0]
with self.lock:
self.consumed = -1
self.remaining = -1
self.stop_node(node, clean_shutdown)
self.start_node(node)

def try_parse_json(self, string):
"""Try to parse a string as json. Return None if not parseable."""
try:
record = json.loads(string)
return record
except ValueError:
self.logger.debug("Could not parse as json: %s" % str(string))
return None

@property
def is_done(self):
return self.message_copy_finished

def progress_percent(self):
with self.lock:
if self.remaining < 0:
return 0
if self.consumed + self.remaining == 0:
return 100
return (float(self.consumed)/float(self.consumed + self.remaining)) * 100
Loading

0 comments on commit 1959835

Please sign in to comment.