diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..5156403 --- /dev/null +++ b/LICENSE @@ -0,0 +1,7 @@ +Copyright 2017 Roy Myers + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md index f0e2eb0..3fa3a56 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,104 @@ -Simple kafka/flask server. -Tutorial to come. +# Simple kafka/flask server. +A couple examples of a python app that makes use of Kafka in a docker container, Kafka-python python library, and Flask as a webserver. The intent is to have several branches to cover multiple different scenarios utilizing Kafka. + +This repository handles a couple Kafka use cases. In an incredibly simple overview: + * a __Producer__ sends a message to __Kafka__ + * __Kafka__ accepts the message and then looks for a consumer + * a __Consumer__ grabs the message from __Kafka__ to process + +One very loose metaphor for a kafka system could be subscribing to a magazine. The Company(Producer) prints the magazine. They have your address but don't know how to get to you. + +The company delivers a stack of magazines to UPS(Kafka). UPS has no knowledge/use of what is in that magazine, but they know how to get it to the reader's(Consumer) address. UPS is in charge of delivering, tracking, and keeping the magazine safe. + +The reader(Consumer) has a mailbox with his address on the side therefore UPS drops off the magazine. + +There are a couple of things to note here. + * Kafka messages are organized by topics + * A _consumer_ and _producer_ both read/write to the same topic + * You can think of a topic like an address in the above scenario. It could be a business with multiple people working at the same location or it could be a single home with 1 occupant. + * Kafka does a ton of stuff in the background that I will not get into in this repo. It handles replication and logging and the likes. The [Official Docs](https://kafka.apache.org/) are a good resource if you would like to learn more. + +This repo provides a basis for 2 different __Producer__ scenarios. + 1. A running python Flask server that accepts a POST request and then opens a connection to Kafka. The server sends the POST body to kafka as JSON and closes the connection after the message is sent. + * [server.py](server.py) + 2. A python script that spawns a thread that opens a connection to Kafka and sends two messages every couple seconds for 17 seconds. + * [kafkaProducerService.py](kafkaProducerService.py) + +This repo provides a basis for 1 __Consumer__ scenario. + 1. A python script to spawn a thread that runs for 20 seconds and opens a connection to listen on a topic. The thread prints all messages to stdout then closes the connection after the 20 seconds. + * [kafkaConsumerService.py](kafkaConsumerService.py) + +This is not a tutorial for using/configuring Kafka. To make things simple, I use the spotify/kafka image that has all the needed functionality inside one container. This also nullifies a lot of the benefits that Kafka can provide. To make it easy, I have included scripts to get the container(s) up and running. The scripts have comments and I implore you to read the commands and be very familiar with what you are running. +//Tutorial to come and be linked. + + +## Getting Started + +These instructions will get you a copy of the project up and running on your local machine for development and testing purposes. See deployment for notes on how to deploy the project on a live system `//TODO`. + +### Prerequisites + +What things you need to install the software and how to install them. + +These apps require a couple docker containers to run properly. The scripts should be all you need aside from Docker to handle the docker set-up. +Only one of the containers is necessary, the one running Kafka/Zookeeper. A script is provided to run a Kafka Consumer that prints kafka messages for testing/debugging. For these reasons I didn't create a docker-compose config. +``` +scripts/createTopicOnExistingKafka.sh - Executes a command on the running Kafka container to create a topic. +scripts/listTopicOnExistingKafka.sh - Checks to make sure the topic was created. It runs a command on an existing Docker container and lists all topics. +scripts/listenToKafkaTopic.sh - Spawns a temporary docker container to consume from the Kafka topic. It prints the messages to stdout and exits on interrupt. +scripts/startMasterZookeeperContainer.sh - Creates a docker network. Starts 1 container in the background that runs zookeeper and Kafka. This is the only necessary container and should be created first. +``` + +Order to run: + * `startMasterZookeeperContainer.sh` > Output is the container ID + * `createTopicOnExistingKafka.sh` > Output is something along the lines of Topic created + * `listTopicOnExistingKafka.sh` > Output is something along the lines of current topics : + * `listenToKafkaTopic.sh` > Output is nothing at first then it is the Kafka messages as they get consumed. Container exits when you Ctrl^C + +You will end with a persistent container running Kafka and Zookeeper in the background, a container printing out to the terminal relaying messages to a Kafka topic. + +## Installing + +A step by step series of examples that tell you have to get a development env running. With python I Always suggest a virtual environment. + +__The docker container must be running for the producers to connect.__ + +Install the python packages + +``` +pip install -r requirements.txt +``` + +To run the Consumer run +``` +python kafkaConsumerService.py #Remember this only runs for 20 seconds +``` + +Then you can run the consumers. + +``` +python server.py #Runs indefinitally but only sends a message when it recieves a POST +``` +or +``` +python kafkaProducerService.py #Remember this only runs for 17 seconds +``` + + + + +## Authors + +* **Roy Myers** - *Initial work* - [myersr](https://github.com/myersr) + + +## License + +This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details ---- -## Links +## Links and Thanks! [kafka-python](http://kafka-python.readthedocs.io/en/master/) [Flask](http://flask.pocoo.org/) [Example kafka-python program/server](https://github.com/dpkp/kafka-python/blob/master/example.py) @@ -14,3 +109,4 @@ Tutorial to come. Test if Zookeeper has registered a broker: `echo dump | nc localhost 2181 | grep brokers` and look for an ID To run Spotify container `docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=kafka-spotify --env ADVERTISED_PORT=9092 --name kafka-spotify spotify/kafka` instead of the supplied command as I don't have docker-machine. +Please Reach-out diff --git a/kafkaConsumerService.py b/kafkaConsumerService.py new file mode 100644 index 0000000..a0580eb --- /dev/null +++ b/kafkaConsumerService.py @@ -0,0 +1,96 @@ +import threading, logging, time +import multiprocessing +import os + +from kafka import KafkaConsumer + + +#Thread class that continuously consumes messages from a Kafka message queue/topic +# Input: +# threading.Thread = instance of multiproccessing thread +class Consumer(multiprocessing.Process): + #default initializing function of the thread. + # Input: + # takes self to modify and initialize + def __init__(self): + #initializes thread and passes self. Magic multithreading stuff + multiprocessing.Process.__init__(self) + #Gives the thread an envent called stop_event so it can be interrupted. + self.stop_event = multiprocessing.Event() + + #Function to stop the process + def stop(self): + #Calls even stop_event and sets it. + #This gives context to the thread from the outside and lets you stop it. + self.stop_event.set() + + #The main run function called when you call start. + def run(self): + if hasattr(os, 'getppid'): # only available on Unix + print 'parent process:', os.getppid() + procID = os.getppid() + #Bootstraps an instance of a Kafka producer. + #Initializes the producer and identifies the docker server. + #kafka-spotify is listed in /etc/hosts with the ip of the container + #Input: + # topic to subscribe to: 'test' + # Id to identify the consumer should be unique to the connection + # Servers kafka is advertising as + # Which message rule to subscribe to. 'earliest' will grab the earliest unprocessed message + # Timeout limit + consumer = KafkaConsumer('test', + client_id='python-consumer-%s' % (procID), + bootstrap_servers=['kafka-spotify:9092'], + auto_offset_reset='latest', + consumer_timeout_ms=1000) + + #Alternative way to subscribe to a topic + #consumer.subscribe(['test']) + + #loop until the thread is stopped by checking the stop event + while not self.stop_event.is_set(): + #Loop through ConsumerRecord objects in the consumer object + for message in consumer: + #print the messages to the screen with a note of the thread/client ID + #print("python-consumer-%s processed message: %s" % (procID, message)) + #print the messages to the screen with a note of the thread/client ID, Current Topic, message number, # The value of the message decoded as it is sent as bytecode + print ("python-consumer-%s processed message: %s:%d: value=%s" % (procID, message.topic, + message.offset, message.value.decode('utf-8'))) + #break out of the for loop if the thread was notified of closure + if self.stop_event.is_set(): + break + + #Close the TCP connection to kafka + consumer.close() + + +#Main function called when the app is run +def main(): + #initialize a Consumer object/thread + kafkConsumer = Consumer() + + #Start the thread working. + kafkConsumer.start() + + #sleep for 17 second. If we weren't using threads this would halt the code + time.sleep(20) + + #Call stop to set the thread event so it knows to stop + print("Stopping kafkConsumer") + kafkConsumer.stop() + + #Wait until the thread terminates. Can see the docs for more + #https://docs.python.org/2/library/threading.html?highlight=thread#threading.Thread.join + print("Waiting for execution to halt") + kafkConsumer.join() + +#the logic to run as process +if __name__ == "__main__": + #Set logging format and level + #logging.basicConfig( + # format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', + # level=logging.INFO) + + #Call the main function + main() + diff --git a/kafkaProducerService.py b/kafkaProducerService.py index 95b7a3e..4c4b7c6 100644 --- a/kafkaProducerService.py +++ b/kafkaProducerService.py @@ -4,45 +4,68 @@ from kafka import KafkaProducer -def sendStreamM(topicN, tContent): - producer = KafkaProducer(bootstrap_servers='172.20.0.2:9092') - producer.send(topicN, tContent) - producer.close() - +#Thread class that continuously sends messages to a Kafka message queue +# Input: +# threading.Thread = instance of multiproccessing thread class Producer(threading.Thread): + #default initializing function of the thread. + # Input: + # takes self to modify and initialize def __init__(self): + #initializes thread and passes self. Magic multithreading stuff threading.Thread.__init__(self) + #Gives the thread an envent called stop_event so it can be interrupted. self.stop_event = threading.Event() - + + #Function to stop the thread def stop(self): + #Calls even stop_event and sets it. + #This gives context to the thread from the outside and lets you stop it. self.stop_event.set() - + + #The main run function called when you call start. def run(self): - producer = KafkaProducer(bootstrap_servers='172.20.0.2:9092') - - #while not self.stop_event.is_set(): - # producer.send('test', b"test") - # producer.send('test', b"\xc2Hola, mundo!") - # time.sleep(3) + #Bootstraps an instance of a Kafka producer. + #Initializes the producer and identifies the docker server. + #kafka-spotify is listed in /etc/hosts with the ip of the container + producer = KafkaProducer(bootstrap_servers='kafka-spotify:9092') + #loop until the thread is stopped by checking the stop event + while not self.stop_event.is_set(): + #Send two messages of type binary to the 'test' Topic + producer.send('test', b"test") + producer.send('test', b"Hola, mundo!") + #Sleep for 3 seconds + time.sleep(3) + + #Close the TCP stream to Kafka producer.close() +#Main function called when the app is run def main(): + #initialize a producer object/thread kafkProducer = Producer() + #Start the thread working. kafkProducer.start() + #sleep for 17 second. If we weren't using threads this would halt the code time.sleep(17) + #Call stop to set the thread event so it knows to stop kafkProducer.stop() + #Wait until the thread terminates. Can see the docs for more + #https://docs.python.org/2/library/threading.html?highlight=thread#threading.Thread.join kafkProducer.join() - +#the logic of running as process if __name__ == "__main__": + #Set logging format and level logging.basicConfig( format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', level=logging.INFO ) + #Call the main function main() diff --git a/scripts/createTopicOnExistingKafka.sh b/scripts/createTopicOnExistingKafka.sh new file mode 100644 index 0000000..2478339 --- /dev/null +++ b/scripts/createTopicOnExistingKafka.sh @@ -0,0 +1,3 @@ +# Runs command on the existing and running container kafka-spotify +# hits zookeeper and creates topic test +docker exec kafka-spotify /opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test diff --git a/scripts/listTopicOnExistingKafka.sh b/scripts/listTopicOnExistingKafka.sh new file mode 100644 index 0000000..ee53a6d --- /dev/null +++ b/scripts/listTopicOnExistingKafka.sh @@ -0,0 +1,3 @@ +# Runs command on existing running container kafka-spotify +# Tells zookeeper to list all topics and then it exits +docker exec kafka-spotify /opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh --list --zookeeper localhost:2181 diff --git a/scripts/listenToKafkaTopic.sh b/scripts/listenToKafkaTopic.sh new file mode 100644 index 0000000..428a9a8 --- /dev/null +++ b/scripts/listenToKafkaTopic.sh @@ -0,0 +1,5 @@ +# Starts a docker container on the kafka-net docker network +# Exits on close Ctrl^C +# From spotify/kafka docker image +# Listens on topic test +docker run -it --rm --network kafka-net spotify/kafka /opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --bootstrap-server kafka-spotify:9092 --topic test --from-beginning diff --git a/scripts/startMasterZookeeperContainer.sh b/scripts/startMasterZookeeperContainer.sh new file mode 100644 index 0000000..cd0c59b --- /dev/null +++ b/scripts/startMasterZookeeperContainer.sh @@ -0,0 +1,10 @@ +# Starts container in the background from spotify/kafka +# Runs Zookeeper and kafka in same container +# ADVERTISED_HOST +# is set to the container name as it is the advertised domain name +# the container tries to connect to itself at this domain +# MUST add the record ` kafka-spotify` to /etc/hosts +# On kafka-net so domain record is shared by containers +# named kafka-spotify. This must be the same as the advertised_host var +docker network create -d bridge kafka-net +docker run -d -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=kafka-spotify --env ADVERTISED_PORT=9092 --network kafka-net --name kafka-spotify spotify/kafka diff --git a/server.py b/server.py index 0d2c0e6..619cfb2 100644 --- a/server.py +++ b/server.py @@ -5,21 +5,36 @@ #producer = KafkaProducer(bootstrap_servers='host:port',value_serializer=lambda v: json.dumps(v).encode('utf-8')) +#Initialize the Flask app app = Flask(__name__) +#VERY UNSAFE for production. +#Allows flask to accept cross-origin requests for local development CORS(app) +#Register a function @ / that could be GET or POST +# defaults to http://localhost:5000/ @app.route("/", methods=["GET","POST"]) -def hello(): +def hello(): #hello() is registered to route / + #Gets the POST body as a JSON object print request.get_json() body = request.get_json() - producer = KafkaProducer(bootstrap_servers='172.20.0.2:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8')) + #Bootstraps an instance of a Kafka producer. + #Initializes the producer and identifies the docker server. + #kafka-spotify is listed in /etc/hosts with the ip of the container + #Sets the producer serializer to JSON + producer = KafkaProducer(bootstrap_servers='kafa-python:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8')) + #Send a message to the kafka topic 'test' + #passes the POST JSON body as the message producer.send('test', body) + #Closes the TCP stream to Kafka producer.close() + #Returns a Complete string return "Complete" +#Main process function if __name__ == "__main__": + #Bind to broadcast so you can access the server app.run('0.0.0.0') - print "After app.run"