Skip to content
This repository has been archived by the owner on May 5, 2024. It is now read-only.

Commit

Permalink
feat: Add support for whitelisting topic prefixes
Browse files Browse the repository at this point in the history
  • Loading branch information
joschi committed Nov 11, 2022
1 parent 7b5b7ab commit 0626694
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 14 deletions.
9 changes: 9 additions & 0 deletions docs/specification.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ The desired state file consists of:
- **topics** [Optional]:
- **defaults** [Optional]: Specify topic defaults so you don't need to specify them for every topic in the state file. Currently, only replication is supported.
- **blacklist** [Optional]: Add a prefixed topic blacklist for ignoring specific topics when using `kafka-gitops`. This allows topics to be ignored from being deleted if they are not defined in the desired state file.
- **whitelist** [Optional]: Add a prefixed topic whitelist for exclusively handling specific topics when using `kafka-gitops`. This allows topics to be exclusively handled and topics not on the list are being ignored, even if they are not defined in the desired state file.

?> `topics.blacklist` and `topics.whitelist` are _not mutually exclusive_ can be used together to whitelist specific topic prefixes and blacklist individual "sub-topics".

?> The blacklist takes precedence over the whitelist, so if a topic name is matched by both, it will be ignored and not deleted if it was not defined in the desired state file.

**Example**:
```yaml
Expand All @@ -35,6 +40,10 @@ settings:
blacklist:
prefixed:
- _confluent
- my-topics-excluded
whitelist:
prefixed:
- my-topics
```
## Topics
Expand Down
18 changes: 15 additions & 3 deletions src/main/java/com/devshawn/kafka/gitops/StateManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.devshawn.kafka.gitops.domain.state.settings.Settings;
import com.devshawn.kafka.gitops.domain.state.settings.SettingsCCloud;
import com.devshawn.kafka.gitops.domain.state.settings.SettingsTopics;
import com.devshawn.kafka.gitops.domain.state.settings.SettingsTopicsBlacklist;
import com.devshawn.kafka.gitops.domain.state.settings.SettingsTopicsList;
import com.devshawn.kafka.gitops.exception.ConfluentCloudException;
import com.devshawn.kafka.gitops.exception.InvalidAclDefinitionException;
import com.devshawn.kafka.gitops.exception.MissingConfigurationException;
Expand All @@ -39,6 +39,7 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -153,7 +154,8 @@ private void createServiceAccount(String name, List<ServiceAccount> serviceAccou
private DesiredState getDesiredState() {
DesiredStateFile desiredStateFile = getAndValidateStateFile();
DesiredState.Builder desiredState = new DesiredState.Builder()
.addAllPrefixedTopicsToIgnore(getPrefixedTopicsToIgnore(desiredStateFile));
.addAllPrefixedTopicsToIgnore(getPrefixedTopicsToIgnore(desiredStateFile))
.addAllPrefixedTopicsToAccept(getPrefixedTopicsToAccept(desiredStateFile));

generateTopicsState(desiredState, desiredStateFile);

Expand Down Expand Up @@ -297,7 +299,7 @@ private List<String> getPrefixedTopicsToIgnore(DesiredStateFile desiredStateFile
desiredStateFile.getSettings()
.flatMap(Settings::getTopics)
.flatMap(SettingsTopics::getBlacklist)
.map(SettingsTopicsBlacklist::getPrefixed)
.map(SettingsTopicsList::getPrefixed)
.ifPresent(topics::addAll);

desiredStateFile.getServices().forEach((name, service) -> {
Expand All @@ -308,6 +310,16 @@ private List<String> getPrefixedTopicsToIgnore(DesiredStateFile desiredStateFile
return topics;
}

private List<String> getPrefixedTopicsToAccept(DesiredStateFile desiredStateFile) {
return desiredStateFile.getSettings()
.flatMap(Settings::getTopics)
.flatMap(SettingsTopics::getWhitelist)
.map(SettingsTopicsList::getPrefixed)
.stream()
.flatMap(Collection::stream)
.toList();
}

private GetAclOptions buildGetAclOptions(String serviceName) {
return new GetAclOptions.Builder().setServiceName(serviceName).setDescribeAclEnabled(describeAclEnabled).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public interface DesiredState {

List<String> getPrefixedTopicsToIgnore();

List<String> getPrefixedTopicsToAccept();

class Builder extends DesiredState_Builder {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ public interface SettingsTopics {

Optional<SettingsTopicsDefaults> getDefaults();

Optional<SettingsTopicsBlacklist> getBlacklist();
Optional<SettingsTopicsList> getBlacklist();

Optional<SettingsTopicsList> getWhitelist();

class Builder extends SettingsTopics_Builder {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
import java.util.List;

@FreeBuilder
@JsonDeserialize(builder = SettingsTopicsBlacklist.Builder.class)
public interface SettingsTopicsBlacklist {
@JsonDeserialize(builder = SettingsTopicsList.Builder.class)
public interface SettingsTopicsList {

List<String> getPrefixed();

class Builder extends SettingsTopicsBlacklist_Builder {
class Builder extends SettingsTopicsList_Builder {
}
}
20 changes: 13 additions & 7 deletions src/main/java/com/devshawn/kafka/gitops/manager/PlanManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,28 @@ public void planTopics(DesiredState desiredState, DesiredPlan.Builder desiredPla
desiredPlan.addTopicPlans(topicPlan.build());
});

topics.forEach(currentTopic -> {
boolean shouldIgnore = desiredState.getPrefixedTopicsToIgnore().stream().anyMatch(it -> currentTopic.name().startsWith(it));
if (shouldIgnore) {
LOG.info("[PLAN] Ignoring topic {} due to prefix", currentTopic.name());
return;
for (TopicListing currentTopic : topics) {
boolean acceptTopic = desiredState.getPrefixedTopicsToAccept().stream().anyMatch(it -> currentTopic.name().startsWith(it));
if (!desiredState.getPrefixedTopicsToAccept().isEmpty() && !acceptTopic) {
LOG.info("[PLAN] Ignoring topic {} due to missing prefix (whitelist)", currentTopic.name());
continue;
}

if (!managerConfig.isDeleteDisabled() && desiredState.getTopics().getOrDefault(currentTopic.name(), null) == null) {
boolean ignoreTopic = desiredState.getPrefixedTopicsToIgnore().stream().anyMatch(it -> currentTopic.name().startsWith(it));
if (ignoreTopic) {
LOG.info("[PLAN] Ignoring topic {} due to prefix (blacklist)", currentTopic.name());
continue;
}

if (!managerConfig.isDeleteDisabled() && !desiredState.getTopics().containsKey(currentTopic.name())) {
TopicPlan topicPlan = new TopicPlan.Builder()
.setName(currentTopic.name())
.setAction(PlanAction.REMOVE)
.build();

desiredPlan.addTopicPlans(topicPlan);
}
});
}
}

private void planTopicConfigurations(String topicName, TopicDetails topicDetails, List<ConfigEntry> configs, TopicPlan.Builder topicPlan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ class PlanCommandIntegrationSpec extends Specification {
"seed-topic-modification-no-delete" | true
"seed-acl-exists" | true
"seed-blacklist-topics" | false
"seed-blacklist-whitelist-topics" | false
"seed-whitelist-topics" | false
}

void 'test include unchanged flag - #planName #includeUnchanged'() {
Expand Down
35 changes: 35 additions & 0 deletions src/test/resources/plans/seed-blacklist-whitelist-topics-plan.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"topicPlans": [
{
"name": "new-topic",
"action": "ADD",
"topicDetails": {
"partitions": 6,
"replication": 1,
"configs": {}
},
"topicConfigPlans": []
},
{
"name": "topic-with-configs-1",
"action": "REMOVE",
"topicDetails": null,
"topicConfigPlans": []
}
],
"aclPlans": [
{
"name": "Unnamed ACL",
"aclDetails": {
"name": "test-topic",
"type": "TOPIC",
"pattern": "LITERAL",
"principal": "User:test",
"host": "*",
"operation": "READ",
"permission": "ALLOW"
},
"action": "REMOVE"
}
]
}
14 changes: 14 additions & 0 deletions src/test/resources/plans/seed-blacklist-whitelist-topics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
settings:
topics:
blacklist:
prefixed:
- test
- topic-with-configs-2
whitelist:
prefixed:
- topic-with

topics:
new-topic:
partitions: 6
replication: 1
35 changes: 35 additions & 0 deletions src/test/resources/plans/seed-whitelist-topics-plan.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"topicPlans": [
{
"name": "test-new-topic",
"action": "ADD",
"topicDetails": {
"partitions": 6,
"replication": 1,
"configs": {}
},
"topicConfigPlans": []
},
{
"name": "test-topic",
"action": "REMOVE",
"topicDetails": null,
"topicConfigPlans": []
}
],
"aclPlans": [
{
"name": "Unnamed ACL",
"aclDetails": {
"name": "test-topic",
"type": "TOPIC",
"pattern": "LITERAL",
"principal": "User:test",
"host": "*",
"operation": "READ",
"permission": "ALLOW"
},
"action": "REMOVE"
}
]
}
10 changes: 10 additions & 0 deletions src/test/resources/plans/seed-whitelist-topics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
settings:
topics:
whitelist:
prefixed:
- test

topics:
test-new-topic:
partitions: 6
replication: 1

0 comments on commit 0626694

Please sign in to comment.