Skip to content

Commit

Permalink
Add topology metadata support for topics and users (kafka-ops#200)
Browse files Browse the repository at this point in the history
* Add topology metadata support for topics and users

* Add documentation on the metadata feature

Co-authored-by: Aksel Hilde <[email protected]>
  • Loading branch information
akselh and Aksel Hilde authored Jan 21, 2021
1 parent 796245a commit 5e01435
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 14 deletions.
53 changes: 53 additions & 0 deletions docs/the-descriptor-files.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,59 @@ The relevant properties are:

more details can be found in the :doc:`config-values` section where the most important configuration details are explained.

What can you do in the descriptor files?
-----------

The `example` directory in the project contains a set of example topologies that you can use to get an idea of how to
use the different features. For example:
* RBAC roles with Confluent Platform
* Administration of schemas for the topics
* Using plans to create common topic configurations
* Add metadata to different elements in the topology

Add metadata to your topology
-----------

In addition to definitions needed for creating the required topics, acls, rbac bindings etc in your Kafka cluster KTB
also supports annotating your topology through metadata. This can allow you to make the descriptor files easier to read,
e.g. for someone not into the details of what a given topic is all about.

But you can also utilise it to do the following:
* Add validations that also use metadata. E.g. combine topic metadata and topic name to enforce more refined topic naming rules.
* Generate documentation from the your descriptor files (with some tool) and include metadata create better documentation on topics and users.

You can add metadata to the following elements in the topology:
* topics
* consumers
* producers
* streams
* connectors

A short example is given below, have a look in the `example` directory for a more complete example.

.. code-block:: YAML
---
context: "context"
projects:
- name: "foo with metadata"
consumers:
- principal: "User:App0"
metadata:
system: "System0"
producers:
- principal: "User:App1"
metadata:
system: "System1"
topics:
- name: "topicA"
metadata:
domain: "Sales"
owner: "DepartmentA"
config:
replication.factor: "3"
num.partitions: "3"
Manage only topics, the optional files
-----------

Expand Down
51 changes: 51 additions & 0 deletions example/descriptor-with-metadata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
---
context: "context"
projects:
- name: "foo with metadata"
consumers:
- principal: "User:App0"
metadata:
system: "System0"
producers:
- principal: "User:App1"
metadata:
system: "System1"
streams:
- principal: "User:StreamsApp1"
metadata:
contactInfo: "[email protected]"
topics:
read:
- "topicA"
write:
- "topicB"
connectors:
- principal: "User:Connect1"
metadata:
system: "System3"
contactInfo: "[email protected]"
status_topic: "status"
offset_topic: "offset"
configs_topic: "configs"
topics:
read:
- "topicB"
topics:
- name: "topicA"
metadata:
domain: "Sales"
owner: "DepartmentA"
consumers:
- principal: "User:App4"
metadata:
system: "System4"
config:
replication.factor: "1"
num.partitions: "1"
- name: "topicB"
metadata:
domain: "ProductionStatistics"
owner: "DepartmentB"
dataType: "avro"
schemas:
value.schema.file: "schemas/bar-value.avsc"
10 changes: 10 additions & 0 deletions src/main/java/com/purbon/kafka/topology/model/Impl/TopicImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class TopicImpl implements Topic, Cloneable {
@JsonInclude(Include.NON_EMPTY)
private String plan;

private Map<String, String> metadata;

private Map<String, String> config;
@JsonIgnore private TopologyBuilderConfig appConfig;
@JsonIgnore private Map<String, Object> context;
Expand Down Expand Up @@ -230,6 +232,14 @@ public void addAppConfig(TopologyBuilderConfig appConfig) {
this.appConfig = appConfig;
}

public Map<String, String> getMetadata() {
return metadata;
}

public void setMetadata(Map<String, String> metadata) {
this.metadata = metadata;
}

@Override
public Topic clone() {
try {
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/purbon/kafka/topology/model/Topic.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,9 @@ public interface Topic {

void setProducers(List<Producer> producers);

Map<String, String> getMetadata();

void setMetadata(Map<String, String> metadata);

SubjectNameStrategy getSubjectNameStrategy();
}
11 changes: 11 additions & 0 deletions src/main/java/com/purbon/kafka/topology/model/User.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.purbon.kafka.topology.model;

import java.util.Map;

public class User {

private String principal;
private Map<String, String> metadata;

public User() {
this("");
Expand All @@ -19,4 +22,12 @@ public String getPrincipal() {
public void setPrincipal(String principal) {
this.principal = principal;
}

public Map<String, String> getMetadata() {
return metadata;
}

public void setMetadata(Map<String, String> metadata) {
this.metadata = metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,7 @@ public TopicImpl deserialize(JsonParser parser, DeserializationContext context)
Optional<JsonNode> optionalDataTypeNode = Optional.ofNullable(rootNode.get("dataType"));
Optional<String> optionalDataType = optionalDataTypeNode.map(JsonNode::asText);

Optional<JsonNode> optionalConfigNode = Optional.ofNullable(rootNode.get("config"));
Map<String, String> config =
optionalConfigNode
.map(
node -> {
Map<String, String> map = new HashMap<>();
Iterator<Map.Entry<String, JsonNode>> it = node.fields();
while (it.hasNext()) {
Map.Entry<String, JsonNode> entry = it.next();
map.put(entry.getKey(), entry.getValue().asText());
}
return map;
})
.orElse(new HashMap<>());
Map<String, String> config = getMap(rootNode.get("config"));

Optional<JsonNode> optionalPlanLabel = Optional.ofNullable(rootNode.get("plan"));
if (optionalPlanLabel.isPresent() && plans.size() == 0) {
Expand Down Expand Up @@ -154,11 +141,30 @@ public TopicImpl deserialize(JsonParser parser, DeserializationContext context)

topic.setSchemas(schemas);

Map<String, String> metadata = getMap(rootNode.get("metadata"));
topic.setMetadata(metadata);

LOGGER.debug(
String.format("Topic %s with config %s has been created", topic.getName(), config));
return topic;
}

private Map<String, String> getMap(JsonNode jsonNode) {
Optional<JsonNode> optionalNode = Optional.ofNullable(jsonNode);
return optionalNode
.map(
node -> {
Map<String, String> map = new HashMap<>();
Iterator<Map.Entry<String, JsonNode>> it = node.fields();
while (it.hasNext()) {
Map.Entry<String, JsonNode> entry = it.next();
map.put(entry.getKey(), entry.getValue().asText());
}
return map;
})
.orElse(new HashMap<>());
}

private Function<JsonNode, Either<ValidationException, TopicSchemas>> validateAndBuildSchemas(
Topic topic) {
return node -> {
Expand Down
17 changes: 17 additions & 0 deletions src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,23 @@ public void setup() {
parser = new TopologySerdes();
}

@Test
public void testMetadata() {
Topology topology =
parser.deserialise(TestUtils.getResourceFile("/descriptor-with-metadata.yaml"));
Project project = topology.getProjects().get(0);

assertThat(project.getConsumers().get(0).getMetadata()).containsKey("system");
assertThat(project.getProducers().get(0).getMetadata()).containsKey("contactInfo");
assertThat(project.getStreams().get(0).getMetadata()).containsKey("system");
assertThat(project.getConnectors().get(0).getMetadata()).containsKey("system");
assertThat(project.getTopics().get(0).getMetadata()).containsKey("domain");
assertThat(project.getTopics().get(1).getMetadata()).containsKey("domain");
assertThat(project.getTopics().get(1).getMetadata()).containsKey("owner");
assertThat(project.getTopics().get(0).getConsumers().get(0).getMetadata())
.containsKey("system");
}

@Test
public void testDynamicFirstLevelAttributes() {
Topology topology =
Expand Down
50 changes: 50 additions & 0 deletions src/test/resources/descriptor-with-metadata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
---
context: "contextOrg"
source: "source"
projects:
- name: "foo with metadata"
consumers:
- principal: "User:App0"
metadata:
system: "OwnerSystem0"
producers:
- principal: "User:App1"
metadata:
contactInfo: "[email protected]"
streams:
- principal: "User:StreamsApp1"
metadata:
system: "OwnerSystemS1"
topics:
read:
- "topicA"
write:
- "topicB"
connectors:
- principal: "User:Connect1"
metadata:
system: "OwnerSystemC1"
status_topic: "status"
offset_topic: "offset"
configs_topic: "configs"
topics:
read:
- "topicB"
topics:
- name: "topicA"
metadata:
domain: "SomeInformationModelDomain"
consumers:
- principal: "User:App4"
metadata:
system: "OwnerSystem4"
config:
replication.factor: "1"
num.partitions: "1"
- name: "topicB"
metadata:
domain: "SomeInformationModelDomain"
owner: "DataOwnerDepartment"
dataType: "avro"
schemas:
value.schema.file: "schemas/bar-value.avsc"

0 comments on commit 5e01435

Please sign in to comment.