Skip to content

Commit

Permalink
Detect divergences between local state and the remote cluster current…
Browse files Browse the repository at this point in the history
… status (kafka-ops#478)

* detect changes in topics deleted from the cluster, but not from the local state

* small code reorg

* add code to detect divergences when acls are deleted in the remote cluster, but not mapped in the local state, if using one

* add support for detecting divergences when connectors are deleted from the remote deployment, but still in the local state file

* add support for detecting divergences when ksql artifacts are deleted from the remote deployment, but still in the local state file

* refactor

* introduce an specific exception type for when a divergence between local and remote state is found
  • Loading branch information
purbon authored Apr 7, 2022
1 parent f6fe353 commit a8b5de8
Show file tree
Hide file tree
Showing 13 changed files with 283 additions and 32 deletions.
41 changes: 36 additions & 5 deletions src/main/java/com/purbon/kafka/topology/AccessControlManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.purbon.kafka.topology.actions.access.CreateBindings;
import com.purbon.kafka.topology.actions.access.builders.*;
import com.purbon.kafka.topology.actions.access.builders.rbac.*;
import com.purbon.kafka.topology.exceptions.RemoteValidationException;
import com.purbon.kafka.topology.model.Component;
import com.purbon.kafka.topology.model.DynamicUser;
import com.purbon.kafka.topology.model.JulieRoles;
Expand All @@ -29,6 +30,7 @@
import java.io.PrintStream;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -84,13 +86,42 @@ public void updatePlan(ExecutionPlan plan, final Map<String, Topology> topologie
.forEach(plan::add);
}

private Set<TopologyAclBinding> loadActualClusterStateIfAvailable(ExecutionPlan plan) {
private Set<TopologyAclBinding> loadActualClusterStateIfAvailable(ExecutionPlan plan)
throws IOException {
Set<TopologyAclBinding> bindings =
config.fetchStateFromTheCluster() ? providerBindings() : plan.getBindings();
return bindings.stream()
.filter(this::matchesManagedPrefixList)
.filter(this::isNotInternalAcl)
.collect(Collectors.toSet());
var currentState =
bindings.stream()
.filter(this::matchesManagedPrefixList)
.filter(this::isNotInternalAcl)
.collect(Collectors.toSet());

if (!config.fetchStateFromTheCluster()) {
// should detect if there are divergences between the local cluster state and the current
// status in the cluster
detectDivergencesInTheRemoteCluster(plan);
}

return currentState;
}

private void detectDivergencesInTheRemoteCluster(ExecutionPlan plan)
throws RemoteValidationException {
var remoteAcls = providerBindings();

var delta =
plan.getBindings().stream()
.filter(acl -> !remoteAcls.contains(acl))
.collect(Collectors.toList());

if (delta.size() > 0) {
String errorMessage =
"Your remote state has changed since the last execution, this ACL(s): "
+ StringUtils.join(delta, ",")
+ " are in your local state, but not in the cluster, please investigate!";
LOGGER.error(errorMessage);
throw new RemoteValidationException(errorMessage);
}
}

private boolean isNotInternalAcl(TopologyAclBinding binding) {
Expand Down
37 changes: 35 additions & 2 deletions src/main/java/com/purbon/kafka/topology/ArtefactManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.purbon.kafka.topology.actions.CreateArtefactAction;
import com.purbon.kafka.topology.actions.DeleteArtefactAction;
import com.purbon.kafka.topology.clients.ArtefactClient;
import com.purbon.kafka.topology.exceptions.RemoteValidationException;
import com.purbon.kafka.topology.model.Artefact;
import com.purbon.kafka.topology.model.Topology;
import java.io.IOException;
Expand Down Expand Up @@ -88,8 +89,40 @@ protected ArtefactClient selectClient(Artefact artefact) {
return clients.getOrDefault(artefact.getServerLabel(), defaultClient);
}

abstract Collection<? extends Artefact> loadActualClusterStateIfAvailable(ExecutionPlan plan)
throws IOException;
protected Collection<? extends Artefact> loadActualClusterStateIfAvailable(ExecutionPlan plan)
throws IOException {
var currentState = config.fetchStateFromTheCluster() ? getClustersState() : getLocalState(plan);

if (!config.fetchStateFromTheCluster()) {
// should detect if there are divergences between the local cluster state and the current
// status in the cluster
detectDivergencesInTheRemoteCluster(plan);
}

return currentState;
}

private void detectDivergencesInTheRemoteCluster(ExecutionPlan plan) throws IOException {
var remoteArtefacts = getClustersState();

var delta =
getLocalState(plan).stream()
.filter(localArtifact -> !remoteArtefacts.contains(localArtifact))
.collect(Collectors.toList());

if (delta.size() > 0) {
String errorMessage =
"Your remote state has changed since the last execution, these Artefact(s): "
+ StringUtils.join(delta, ",")
+ " are in your local state, but not in the cluster, please investigate!";
LOGGER.error(errorMessage);
throw new RemoteValidationException(errorMessage);
}
}

protected abstract Collection<? extends Artefact> getLocalState(ExecutionPlan plan);

protected abstract Collection<? extends Artefact> getClustersState() throws IOException;

abstract Set<? extends Artefact> parseNewArtefacts(Topology topology);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ public KSqlArtefactManager(
}

@Override
Collection<? extends Artefact> loadActualClusterStateIfAvailable(ExecutionPlan plan)
throws IOException {
return config.fetchStateFromTheCluster() ? getClustersState() : plan.getKSqlArtefacts();
protected Collection<? extends Artefact> getLocalState(ExecutionPlan plan) {
return plan.getKSqlArtefacts();
}

@Override
Expand Down Expand Up @@ -83,7 +82,8 @@ protected List<? extends Artefact> findArtefactsToBeDeleted(
return toDeleteArtefactsList;
}

private Collection<? extends Artefact> getClustersState() throws IOException {
@Override
protected Collection<? extends Artefact> getClustersState() throws IOException {
List<Either> list =
clients.values().stream()
.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,30 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class KafkaConnectArtefactManager extends ArtefactManager {

private static final Logger LOGGER = LogManager.getLogger(KafkaConnectArtefactManager.class);

public KafkaConnectArtefactManager(
ArtefactClient client, Configuration config, String topologyFileOrDir) {
super(client, config, topologyFileOrDir);
}

@Override
protected Collection<? extends Artefact> getLocalState(ExecutionPlan plan) {
return plan.getConnectors();
}

public KafkaConnectArtefactManager(
Map<String, KConnectApiClient> clients, Configuration config, String topologyFileOrDir) {
super(clients, config, topologyFileOrDir);
}

@Override
Collection<? extends Artefact> loadActualClusterStateIfAvailable(ExecutionPlan plan)
throws IOException {
return config.fetchStateFromTheCluster() ? getClustersState() : plan.getConnectors();
}

private Collection<? extends Artefact> getClustersState() throws IOException {
protected Collection<? extends Artefact> getClustersState() throws IOException {
List<Either> list =
clients.values().stream()
.map(
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/com/purbon/kafka/topology/TopicManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.purbon.kafka.topology.actions.topics.UpdateTopicConfigAction;
import com.purbon.kafka.topology.actions.topics.builders.TopicConfigUpdatePlanBuilder;
import com.purbon.kafka.topology.api.adminclient.TopologyBuilderAdminClient;
import com.purbon.kafka.topology.exceptions.RemoteValidationException;
import com.purbon.kafka.topology.model.Topic;
import com.purbon.kafka.topology.model.Topology;
import com.purbon.kafka.topology.schemas.SchemaRegistryManager;
Expand Down Expand Up @@ -126,9 +127,32 @@ private Set<String> loadActualClusterStateIfAvailable(ExecutionPlan plan) throws
LOGGER.debug(
"Full list of managed topics in the cluster: "
+ StringUtils.join(new ArrayList<>(listOfTopics), ","));

if (!config.fetchStateFromTheCluster()) {
// verify that the remote state does not contain different topics than the local state
detectDivergencesInTheRemoteCluster(plan);
}

return listOfTopics;
}

private void detectDivergencesInTheRemoteCluster(ExecutionPlan plan) throws IOException {
var remoteTopics = adminClient.listApplicationTopics();
var delta =
plan.getTopics().stream()
.filter(localTopic -> !remoteTopics.contains(localTopic))
.collect(Collectors.toList());

if (delta.size() > 0) {
String errorMessage =
"Your remote state has changed since the last execution, this topics: "
+ StringUtils.join(delta, ",")
+ " are in your local state, but not in the cluster, please investigate!";
LOGGER.error(errorMessage);
throw new RemoteValidationException(errorMessage);
}
}

private boolean matchesPrefixList(Topic topic) {
return matchesPrefixList(topic.toString());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.purbon.kafka.topology.exceptions;

import java.io.IOException;

/**
* Exception raised when a remote validation error has happened. For example, when there are
* discrepancies between local state and the remote state (someone delete a topic outside of
* JulieOps)
*/
public class RemoteValidationException extends IOException {
public RemoteValidationException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,8 @@ private void testNoAclsDeleted() throws IOException {

Mockito.reset(aclsProvider);
plan = ExecutionPlan.init(backendController, mockPrintStream);
doReturn(mapBindings(plan)).when(aclsProvider).listAcls();

accessControlManager.updatePlan(builder.buildTopology(), plan);
plan.run();

Expand Down Expand Up @@ -544,6 +546,8 @@ private void testAclsDelete() throws IOException {

Mockito.reset(aclsProvider);
plan = ExecutionPlan.init(backendController, mockPrintStream);
doReturn(mapBindings(plan)).when(aclsProvider).listAcls();

Topology topology = builder.buildTopology();
accessControlManager.updatePlan(topology, plan);
plan.run();
Expand All @@ -555,6 +559,15 @@ private void testAclsDelete() throws IOException {
verify(aclsProvider, times(1)).clearBindings(new HashSet<>(bindingsToDelete));
}

private HashMap<String, List<TopologyAclBinding>> mapBindings(ExecutionPlan plan) {
var allBindings = new HashMap<String, List<TopologyAclBinding>>();
for (var binding : plan.getBindings()) {
allBindings.computeIfAbsent(binding.getResourceName(), k -> new ArrayList<>());
allBindings.get(binding.getResourceName()).add(binding);
}
return allBindings;
}

private List<TopologyAclBinding> returnAclsForConsumers(List<Consumer> consumers, String topic) {

List<AclBinding> acls = newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.purbon.kafka.topology.model.Topology;
import com.purbon.kafka.topology.model.artefact.KafkaConnectArtefact;
import com.purbon.kafka.topology.utils.TestUtils;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -37,7 +38,12 @@ public MyArtefactManager(
}

@Override
Collection<? extends Artefact> loadActualClusterStateIfAvailable(ExecutionPlan plan) {
protected Collection<? extends Artefact> getLocalState(ExecutionPlan plan) {
return null;
}

@Override
protected Collection<? extends Artefact> getClustersState() throws IOException {
return new ArrayList<>();
}

Expand Down
2 changes: 2 additions & 0 deletions src/test/java/com/purbon/kafka/topology/TopicManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ public void topicPartitionCountUpdateTest() throws IOException {
project.addTopic(topicB);

doReturn(new Config(Collections.emptyList())).when(adminClient).getActualTopicConfig(any());
var listOfTopics = new HashSet<>(Arrays.asList(topicA.toString(), topicB.toString()));
doReturn(listOfTopics).when(adminClient).listApplicationTopics();
topicManager.updatePlan(topology, plan);
plan.run();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,32 @@ public void consumerAclsCreation() throws ExecutionException, InterruptedExcepti
verifyConsumerAcls(consumers);
}

@Test(expected = IOException.class)
public void shouldDetectChangesInTheRemoteClusterBetweenRuns() throws IOException {
TopologyBuilderAdminClient adminClient = new TopologyBuilderAdminClient(kafkaAdminClient);

var topology =
TestTopologyBuilder.createProject()
.addTopic("topic1")
.addTopic("topic2")
.addConsumer("User:foo")
.buildTopology();

accessControlManager.updatePlan(topology, plan);
plan.run();

System.out.println("****");
adminClient.fetchAclsList().values().forEach(System.out::println);
System.out.println("****");

var binding =
TopologyAclBinding.build(
"TOPIC", "ctx.project.topic2", "*", "DESCRIBE", "User:foo", "LITERAL");
adminClient.clearAcls(binding);

accessControlManager.updatePlan(topology, plan);
}

@Test
public void producerAclsCreation() throws ExecutionException, InterruptedException, IOException {

Expand Down
Loading

0 comments on commit a8b5de8

Please sign in to comment.