The first project of the Distributed Computation course, lectured at University of Aveiro, in the academic year of 2019/2020, as part of my BSc in Informatics Engineering. Project done with Miguel Almeida.
Table of Contents
The goal of this work was to develop a message broker able to connect producers
and consumers
through a Pub/Sub protocol and three distinct serialization mechanisms: XML
, JSON
and pickle
. This protocol also needed to be designed and implemented over TCP. Furthermore, the message broker should be able to retain messages, so that when a consumer
subscribes to a topic, it receives the last message post there.
Note: the information presented on both Information and Goals are described with more detail in the project guide, which is in Portuguese.
The message broker we developed fulfills every goal and requirement mentioned above, operating with raw sockets
and selectors
.
Additionally, the broker implements the notion of hierarchical topics: if a consumer
subscribes to a topic, it is also automatically subscribed to all the its children topics. This was acomplished with the implementation of a tree structure.
This broker is also prepared to handle multiple connections at the same time, be it producers
or consumers
, and they can join or leave the system at any time without problem.
Any process can connect to the broker by sending a message with 1 byte, to inform the serialization type it will use to communicate.
message value | serialization type |
---|---|
1 | JSON |
2 | XML |
3 | pickle |
The broker will then save this information so that it will be able to decode every message sent by that process.
Every message needs to contain information on its size, so that it can be read correctly. For this, we defined that the first 5 bytes of any message are reserved for its length. The only exception is the message mentioned above, since it can only be 1 byte.
There are 4 messages that will be accepted by the broker (in the 3 different serialization types):
operation | message format (represented in JSON) |
---|---|
join/subscribe topic | {'OP': 'join', 'TOPIC': <topic(str)>, 'TYPE': <type(int)>} |
publish | {'OP': 'publish', 'VALUE': <value(str)>} |
list topics | {'OP': 'topics_request'} |
leave topic | {'OP' : 'leave_topic'} |
The types are mapped as follows:
type value | type |
---|---|
1 | CONSUMER |
2 | PRODUCER |
The topics (represented with <topic(str)>
in the table) follow the common hierarchical directory representation /parent_topic/child_topic/...
.
To finish, this next table shows the format of the messages sent by the broker (also in the 3 different serialization types):
function | message format (represented in JSON) |
---|---|
list existing topics | {'OP' : 'topics_list', 'LIST' : <topic_list(str)>} |
send published message | {'TOPIC': <topic(str)>, 'VALUE': value} |
Here, the topic list (represented with topic_list(str)
) is a string with each existent type separated by the \n
character. For example, 'temp\nmsg'
represents two topics: temp
and msg
.
To start, the code can sometimes be confusing and not intuitive, with some blocks unecessarily replicated. The documentation could also be improved, not to mention the mix of Portuguese and English when naming variables.
Even though this is a bit outside the scope of the project, there are some clear improvements to the protocol that should be considered. For example, it should be possible for publishers
to publish in many topics, selecting which one(s) in every publish message. The consumer
should also be able to leave a specific topic, instead of being forced to unsubscribe of all of them.
The current code to simulate the consumers
and producers
is quite limited, as we weren't supposed to edit these files for submittion. These processes should be improved in order to accomodate for all the functionalities of the message broker. Furthermore, some properties, such as the server port, could be defined with command-line arguments.
It would also be interesting to introduce unit and integration tests to the project.
Below are described the steps to test the system.
You should have at least Python 3.8 installed on your machine. You can find out how in the official page.
Then you just need to clone the repository:
git clone https://github.com/immarianaas/cd-message-broker.git
To activate the message broker, you just need to execute broker.py
:
python3 broker.py
By default, port 8000 will be used. Currently it is not possible to select which port to use dynamically.
To run the example consumer
process, you need to execute consumer.py
, which will connect to the broker on port 8000. We can provide it with an argument to select which topic it is going to join:
optional argument | description | default |
---|---|---|
--type |
type of producer: [temp , msg , weather ] |
temp |
If the argument isn't one of the allowed values, an error will be shown and the process stoped.
python3 consumer.py --type msg
To run the example producer
process, you need to execute producer.py
, which will also connect to the broker on port 8000. We can provide it two arguments to select both the topic and the number of messages that will be sent:
optional argument | description | default |
---|---|---|
--type |
type of producer: [temp , msg , weather ] |
temp |
--length |
number of messages to be sent | 10 |
python3 producer.py --type msg --length 5
Note: just want to highlight that these last two files did not suffer a lot of changes from the base code provided to the students, since they were not part of the assignment.
file | description |
---|---|
Relatorio1.pdf |
Report of the project where the code and protocol is explained, in Portuguese |
Projecto 1 - CD2020.pdf |
Project guide provided by the professors, also in Portuguese |
broker.py |
Core functionality for the message broker |
topicos.py |
Topic tree implementation |
tree.py |
Alternative topic tree implementation |
consumer.py |
Consumer logic for testing purposes |
producer.py |
Producer logic for testing purposes |
middleware.py |
Common functionality for sending and receiving messages, as well as the marshalling process |
utils.py |
Helper function to handle XML data |
Distributed under the MIT License. See LICENSE
for more information.
DETI - Departamento de Eletrónica, Telecomunicações e Informática