diff --git a/docs/the-descriptor-files.rst b/docs/the-descriptor-files.rst index a82475e9f..a130b5c84 100644 --- a/docs/the-descriptor-files.rst +++ b/docs/the-descriptor-files.rst @@ -65,6 +65,59 @@ The relevant properties are: more details can be found in the :doc:`config-values` section where the most important configuration details are explained. +What can you do in the descriptor files? +----------- + +The `example` directory in the project contains a set of example topologies that you can use to get an idea of how to +use the different features. For example: +* RBAC roles with Confluent Platform +* Administration of schemas for the topics +* Using plans to create common topic configurations +* Add metadata to different elements in the topology + +Add metadata to your topology +----------- + +In addition to definitions needed for creating the required topics, acls, rbac bindings etc in your Kafka cluster KTB +also supports annotating your topology through metadata. This can allow you to make the descriptor files easier to read, +e.g. for someone not into the details of what a given topic is all about. + +But you can also utilise it to do the following: +* Add validations that also use metadata. E.g. combine topic metadata and topic name to enforce more refined topic naming rules. +* Generate documentation from the your descriptor files (with some tool) and include metadata create better documentation on topics and users. + +You can add metadata to the following elements in the topology: +* topics +* consumers +* producers +* streams +* connectors + +A short example is given below, have a look in the `example` directory for a more complete example. + +.. code-block:: YAML + + --- + context: "context" + projects: + - name: "foo with metadata" + consumers: + - principal: "User:App0" + metadata: + system: "System0" + producers: + - principal: "User:App1" + metadata: + system: "System1" + topics: + - name: "topicA" + metadata: + domain: "Sales" + owner: "DepartmentA" + config: + replication.factor: "3" + num.partitions: "3" + Manage only topics, the optional files ----------- diff --git a/example/descriptor-with-metadata.yaml b/example/descriptor-with-metadata.yaml new file mode 100644 index 000000000..f06114ede --- /dev/null +++ b/example/descriptor-with-metadata.yaml @@ -0,0 +1,51 @@ +--- +context: "context" +projects: + - name: "foo with metadata" + consumers: + - principal: "User:App0" + metadata: + system: "System0" + producers: + - principal: "User:App1" + metadata: + system: "System1" + streams: + - principal: "User:StreamsApp1" + metadata: + contactInfo: "app1@company.com" + topics: + read: + - "topicA" + write: + - "topicB" + connectors: + - principal: "User:Connect1" + metadata: + system: "System3" + contactInfo: "system3@company.com" + status_topic: "status" + offset_topic: "offset" + configs_topic: "configs" + topics: + read: + - "topicB" + topics: + - name: "topicA" + metadata: + domain: "Sales" + owner: "DepartmentA" + consumers: + - principal: "User:App4" + metadata: + system: "System4" + config: + replication.factor: "1" + num.partitions: "1" + - name: "topicB" + metadata: + domain: "ProductionStatistics" + owner: "DepartmentB" + dataType: "avro" + schemas: + value.schema.file: "schemas/bar-value.avsc" diff --git a/src/main/java/com/purbon/kafka/topology/model/Impl/TopicImpl.java b/src/main/java/com/purbon/kafka/topology/model/Impl/TopicImpl.java index 9ab68747a..d4b7cd95b 100644 --- a/src/main/java/com/purbon/kafka/topology/model/Impl/TopicImpl.java +++ b/src/main/java/com/purbon/kafka/topology/model/Impl/TopicImpl.java @@ -35,6 +35,8 @@ public class TopicImpl implements Topic, Cloneable { @JsonInclude(Include.NON_EMPTY) private String plan; + private Map metadata; + private Map config; @JsonIgnore private TopologyBuilderConfig appConfig; @JsonIgnore private Map context; @@ -230,6 +232,14 @@ public void addAppConfig(TopologyBuilderConfig appConfig) { this.appConfig = appConfig; } + public Map getMetadata() { + return metadata; + } + + public void setMetadata(Map metadata) { + this.metadata = metadata; + } + @Override public Topic clone() { try { diff --git a/src/main/java/com/purbon/kafka/topology/model/Topic.java b/src/main/java/com/purbon/kafka/topology/model/Topic.java index 65b933c54..5e0ba10b1 100644 --- a/src/main/java/com/purbon/kafka/topology/model/Topic.java +++ b/src/main/java/com/purbon/kafka/topology/model/Topic.java @@ -45,5 +45,9 @@ public interface Topic { void setProducers(List producers); + Map getMetadata(); + + void setMetadata(Map metadata); + SubjectNameStrategy getSubjectNameStrategy(); } diff --git a/src/main/java/com/purbon/kafka/topology/model/User.java b/src/main/java/com/purbon/kafka/topology/model/User.java index 03a3e4cf4..d74fb994b 100644 --- a/src/main/java/com/purbon/kafka/topology/model/User.java +++ b/src/main/java/com/purbon/kafka/topology/model/User.java @@ -1,8 +1,11 @@ package com.purbon.kafka.topology.model; +import java.util.Map; + public class User { private String principal; + private Map metadata; public User() { this(""); @@ -19,4 +22,12 @@ public String getPrincipal() { public void setPrincipal(String principal) { this.principal = principal; } + + public Map getMetadata() { + return metadata; + } + + public void setMetadata(Map metadata) { + this.metadata = metadata; + } } diff --git a/src/main/java/com/purbon/kafka/topology/serdes/TopicCustomDeserializer.java b/src/main/java/com/purbon/kafka/topology/serdes/TopicCustomDeserializer.java index f7a10115a..fed7a18fe 100644 --- a/src/main/java/com/purbon/kafka/topology/serdes/TopicCustomDeserializer.java +++ b/src/main/java/com/purbon/kafka/topology/serdes/TopicCustomDeserializer.java @@ -74,20 +74,7 @@ public TopicImpl deserialize(JsonParser parser, DeserializationContext context) Optional optionalDataTypeNode = Optional.ofNullable(rootNode.get("dataType")); Optional optionalDataType = optionalDataTypeNode.map(JsonNode::asText); - Optional optionalConfigNode = Optional.ofNullable(rootNode.get("config")); - Map config = - optionalConfigNode - .map( - node -> { - Map map = new HashMap<>(); - Iterator> it = node.fields(); - while (it.hasNext()) { - Map.Entry entry = it.next(); - map.put(entry.getKey(), entry.getValue().asText()); - } - return map; - }) - .orElse(new HashMap<>()); + Map config = getMap(rootNode.get("config")); Optional optionalPlanLabel = Optional.ofNullable(rootNode.get("plan")); if (optionalPlanLabel.isPresent() && plans.size() == 0) { @@ -154,11 +141,30 @@ public TopicImpl deserialize(JsonParser parser, DeserializationContext context) topic.setSchemas(schemas); + Map metadata = getMap(rootNode.get("metadata")); + topic.setMetadata(metadata); + LOGGER.debug( String.format("Topic %s with config %s has been created", topic.getName(), config)); return topic; } + private Map getMap(JsonNode jsonNode) { + Optional optionalNode = Optional.ofNullable(jsonNode); + return optionalNode + .map( + node -> { + Map map = new HashMap<>(); + Iterator> it = node.fields(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + map.put(entry.getKey(), entry.getValue().asText()); + } + return map; + }) + .orElse(new HashMap<>()); + } + private Function> validateAndBuildSchemas( Topic topic) { return node -> { diff --git a/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java b/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java index 458de9077..2f2664160 100644 --- a/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java +++ b/src/test/java/com/purbon/kafka/topology/TopologySerdesTest.java @@ -42,6 +42,23 @@ public void setup() { parser = new TopologySerdes(); } + @Test + public void testMetadata() { + Topology topology = + parser.deserialise(TestUtils.getResourceFile("/descriptor-with-metadata.yaml")); + Project project = topology.getProjects().get(0); + + assertThat(project.getConsumers().get(0).getMetadata()).containsKey("system"); + assertThat(project.getProducers().get(0).getMetadata()).containsKey("contactInfo"); + assertThat(project.getStreams().get(0).getMetadata()).containsKey("system"); + assertThat(project.getConnectors().get(0).getMetadata()).containsKey("system"); + assertThat(project.getTopics().get(0).getMetadata()).containsKey("domain"); + assertThat(project.getTopics().get(1).getMetadata()).containsKey("domain"); + assertThat(project.getTopics().get(1).getMetadata()).containsKey("owner"); + assertThat(project.getTopics().get(0).getConsumers().get(0).getMetadata()) + .containsKey("system"); + } + @Test public void testDynamicFirstLevelAttributes() { Topology topology = diff --git a/src/test/resources/descriptor-with-metadata.yaml b/src/test/resources/descriptor-with-metadata.yaml new file mode 100644 index 000000000..4c6c320df --- /dev/null +++ b/src/test/resources/descriptor-with-metadata.yaml @@ -0,0 +1,50 @@ +--- +context: "contextOrg" +source: "source" +projects: + - name: "foo with metadata" + consumers: + - principal: "User:App0" + metadata: + system: "OwnerSystem0" + producers: + - principal: "User:App1" + metadata: + contactInfo: "app1@company.com" + streams: + - principal: "User:StreamsApp1" + metadata: + system: "OwnerSystemS1" + topics: + read: + - "topicA" + write: + - "topicB" + connectors: + - principal: "User:Connect1" + metadata: + system: "OwnerSystemC1" + status_topic: "status" + offset_topic: "offset" + configs_topic: "configs" + topics: + read: + - "topicB" + topics: + - name: "topicA" + metadata: + domain: "SomeInformationModelDomain" + consumers: + - principal: "User:App4" + metadata: + system: "OwnerSystem4" + config: + replication.factor: "1" + num.partitions: "1" + - name: "topicB" + metadata: + domain: "SomeInformationModelDomain" + owner: "DataOwnerDepartment" + dataType: "avro" + schemas: + value.schema.file: "schemas/bar-value.avsc"