This is an example for Camel-Kafka-connector AWS2-Kinesis Firehose Sink
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
You’ll need to setup the plugin.path property in your kafka
Open the $KAFKA_HOME/config/connect-standalone.properties
and set the plugin.path
property to your choosen location
In this example we’ll use /home/oscerd/connectors/
> cd /home/oscerd/connectors/
> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-aws2-kinesis-firehose-kafka-connector/0.11.5/camel-aws2-kinesis-firehose-kafka-connector-0.11.5-package.tar.gz
> untar.gz camel-aws2-kinesis-firehose-kafka-connector-0.11.5-package.tar.gz
On AWS console create a Kinesis firehose delivery stream named firehose-stream and choose to send it to an S3 bucket from the available list (in the same region) Choose 1 Mb as buffer size and 60 seconds as buffer interval.
Now it’s time to setup the connectors
Open the AWS2 Kinesis Firehose configuration file
name=CamelAWS2KinesisFirehoseSinkConnector
connector.class=org.apache.camel.kafkaconnector.aws2kinesisfirehose.CamelAws2kinesisfirehoseSinkConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
topics=mytopic
camel.sink.path.streamName=firehose-stream
camel.component.aws2-kinesis-firehose.accessKey=xxxx
camel.component.aws2-kinesis-firehose.secretKey=yyyy
camel.component.aws2-kinesis-firehose.region=eu-west-1
and add the correct credentials for AWS.
Now you can run the example
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelAWS2KinesisFirehoseSinkConnector.properties
Just connect to your AWS Console and check the S3 bucket
On a different terminal run the kafka-producer and send messages to your Kafka Broker.
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopic
Kafka to Kinesis Firehose message 1
Kafka to Kinesis Firehose message 2
You shold see an S3 object created each 60 seconds and in it, you should see the messages concatenated.
First we install the Strimzi operator and use it to deploy the Kafka broker and Kafka Connect into our OpenShift project. We need to create security objects as part of installation so it is necessary to switch to admin user. If you use Minishift, you can do it with the following command:
oc login -u system:admin
We will use OpenShift project myproject
.
If it doesn’t exist yet, you can create it using following command:
oc new-project myproject
If the project already exists, you can switch to it with:
oc project myproject
We can now install the Strimzi operator into this project:
oc apply -f https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.20.1/strimzi-cluster-operator-0.20.1.yaml
Next we will deploy a Kafka broker cluster and a Kafka Connect cluster and then create a Kafka Connect image with the Debezium connectors installed:
# Deploy a single node Kafka broker
oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.20.1/examples/kafka/kafka-persistent-single.yaml
# Deploy a single instance of Kafka Connect with no plug-in installed
oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.20.1/examples/connect/kafka-connect-s2i-single-node-kafka.yaml
Optionally enable the possibility to instantiate Kafka Connectors through specific custom resource:
oc annotate kafkaconnects2is my-connect-cluster strimzi.io/use-connector-resources=true
Strimzi uses Source2Image
builds to allow users to add their own connectors to the existing Strimzi Docker images.
We now need to build the connectors and add them to the image,
if you have built the whole project (mvn clean package
) decompress the connectors you need in a folder (i.e. like my-connectors/
)
so that each one is in its own subfolder
(alternatively you can download the latest officially released and packaged connectors from maven):
So we need to do something like this:
> cd my-connectors/
> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-aws2-kinesis-firehose-kafka-connector/0.11.5/camel-aws2-kinesis-firehose-kafka-connector-0.11.5-package.tar.gz
> untar.gz camel-aws2-kinesis-firehose-kafka-connector-0.11.5-package.tar.gz
Now we can start the build
oc start-build my-connect-cluster-connect --from-dir=./my-connectors/ --follow
We should now wait for the rollout of the new image to finish and the replica set with the new connector to become ready. Once it is done, we can check that the connectors are available in our Kafka Connect cluster. Strimzi is running Kafka Connect in a distributed mode.
To check the available connector plugins, you can run the following command:
oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connector-plugins
You should see something like this:
[{"class":"org.apache.camel.kafkaconnector.CamelSinkConnector","type":"sink","version":"0.11.5"},{"class":"org.apache.camel.kafkaconnector.CamelSourceConnector","type":"source","version":"0.11.5"},{"class":"org.apache.camel.kafkaconnector.aws2kinesisfirehose.CamelAws2kinesisfirehoseSinkConnector","type":"sink","version":"0.11.5"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.5.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.5.0"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]
You can also set the aws creds option as secret, you’ll need to edit the file config/aws2-kinesis-firehose-cred.properties with the correct credentials and then execute the following command
oc create secret generic aws2-kinesis-firehose --from-file=config/openshift/aws2-kinesis-firehose-cred.properties
Now we need to edit KafkaConnectS2I custom resource to reference the secret. For example:
spec:
# ...
config:
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
#...
externalConfiguration:
volumes:
- name: aws-credentials
secret:
secretName: aws2-kinesis-firehose
In this way the secret aws2-kibesis-firehose will be mounted as volume with path /opt/kafka/external-configuration/aws-credentials/
Now we can create some instance of the AWS2 Kinesis Firehose sink connector:
oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -X POST \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
http://my-connect-cluster-connect-api:8083/connectors -d @- <<'EOF'
{
"name": "kinesis-firehose-sink-connector",
"config": {
"connector.class": "org.apache.camel.kafkaconnector.aws2kinesisfirehose.CamelAws2kinesisfirehoseSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"topics": "kinesis-firehose-topic",
"camel.sink.path.streamName": "firehose-stream",
"camel.component.aws2-kinesis-firehose.accessKey": "xxx",
"camel.component.aws2-kinesis-firehose.secretKey": "xxx",
"camel.component.aws2-kinesis-firehose.region": "xxx"
}
}
EOF
Altenatively, if have enabled use-connector-resources
, you can create the connector instance by creating a specific custom resource:
oc apply -f - << EOF
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
name: kinesis-firehose-sink-connector
namespace: myproject
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.camel.kafkaconnector.aws2kinesisfirehose.CamelAws2kinesisfirehoseSinkConnector
tasksMax: 1
config:
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.StringConverter
topics: kinesis-firehose-topic
camel.sink.path.streamName: firehose-stream
camel.component.aws2-kinesis-firehose.accessKey: xxxx
camel.component.aws2-kinesis-firehose.secretKey: yyyy
camel.component.aws2-kinesis-firehose.region: region
EOF
If you followed the optional step for secret credentials you can run the following command:
oc apply -f config/openshift/aws2-kinesis-firehose-sink.yaml
You can check the status of the connector using
oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connectors/kinesis-firehose-sink-connector/status
Just connect to your AWS Console and check the content of camel-kafka-connector bucket.
On a different terminal run the kafka-producer and send messages to your Kafka Broker.
oc exec -i -c kafka my-cluster-kafka-0 -- bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic kinesis-firehose-topic
Kafka to S3 message 1
Kafka to S3 message 2
Kafka to S3 message 3
Kafka to S3 message 4
Kafka to S3 message 5
You shold see an S3 object created each 60 seconds and in it, you should see the messages concatenated.