Skip to content

Commit

Permalink
KAFKA-2826: Make Kafka Connect ducktape services easier to extend.
Browse files Browse the repository at this point in the history
Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Gwen Shapira

Closes apache#522 from ewencp/kafka-2826-extensible-connect-services
  • Loading branch information
ewencp authored and gwenshap committed Nov 13, 2015
1 parent 2802bd0 commit 969d0cb
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions tests/kafkatest/services/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ def __init__(self, context, kafka, files):
def node(self):
return self.nodes[0]

def start_cmd(self, node, connector_configs):
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
cmd += "/opt/%s/bin/connect-standalone.sh %s " % (kafka_dir(node), self.CONFIG_FILE)
cmd += " ".join(connector_configs)
cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
return cmd

def start_node(self, node):
node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)

Expand All @@ -164,10 +171,7 @@ def start_node(self, node):

self.logger.info("Starting Kafka Connect standalone process on " + str(node.account))
with node.account.monitor_log(self.LOG_FILE) as monitor:
node.account.ssh("( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE +
"/opt/%s/bin/connect-standalone.sh %s " % (kafka_dir(node), self.CONFIG_FILE) +
" ".join(remote_connector_configs) +
(" & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)))
node.account.ssh(self.start_cmd(node, remote_connector_configs))
monitor.wait_until('Kafka Connect started', timeout_sec=15, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account))

if len(self.pids(node)) == 0:
Expand All @@ -182,6 +186,12 @@ def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offs
self.offsets_topic = offsets_topic
self.configs_topic = configs_topic

def start_cmd(self, node):
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
cmd += "/opt/%s/bin/connect-distributed.sh %s " % (kafka_dir(node), self.CONFIG_FILE)
cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
return cmd

def start_node(self, node):
node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)

Expand All @@ -192,10 +202,7 @@ def start_node(self, node):

self.logger.info("Starting Kafka Connect distributed process on " + str(node.account))
with node.account.monitor_log(self.LOG_FILE) as monitor:
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
cmd += "/opt/%s/bin/connect-distributed.sh %s " % (kafka_dir(node), self.CONFIG_FILE)
cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
node.account.ssh(cmd)
node.account.ssh(self.start_cmd(node))
monitor.wait_until('Kafka Connect started', timeout_sec=15, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account))

if len(self.pids(node)) == 0:
Expand Down

0 comments on commit 969d0cb

Please sign in to comment.