Skip to content

Commit

Permalink
KAFKA-6166: Streams configuration requires consumer. and producer. in…
Browse files Browse the repository at this point in the history
… order to be read (apache#4434)

* Implement method to get custom properties
* Add custom properties to getConsumerConfigs and getProducerConfigs
* Add tests

Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
h314to authored and guozhangwang committed Jan 30, 2018
1 parent 710aa67 commit cb93d76
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ private Map<String, Object> getCommonConsumerConfigs() {
checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);

final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
consumerProps.putAll(getClientCustomProps());
consumerProps.putAll(clientProvidedProps);

// bootstrap.servers should be from StreamsConfig
Expand Down Expand Up @@ -832,6 +833,7 @@ public Map<String, Object> getProducerConfigs(final String clientId) {

// generate producer configs from original properties and overridden maps
final Map<String, Object> props = new HashMap<>(eosEnabled ? PRODUCER_EOS_OVERRIDES : PRODUCER_DEFAULT_OVERRIDES);
props.putAll(getClientCustomProps());
props.putAll(clientProvidedProps);

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG));
Expand All @@ -847,7 +849,11 @@ public Map<String, Object> getProducerConfigs(final String clientId) {
* @return Map of the admin client configuration.
*/
public Map<String, Object> getAdminConfigs(final String clientId) {
final Map<String, Object> props = getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames());
final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames());

final Map<String, Object> props = new HashMap<>();
props.putAll(getClientCustomProps());
props.putAll(clientProvidedProps);

// add client id with stream client id prefix
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-admin");
Expand All @@ -862,6 +868,26 @@ private Map<String, Object> getClientPropsWithPrefix(final String prefix,
return props;
}

/**
* Get a map of custom configs by removing from the originals all the Streams, Consumer, Producer, and AdminClient configs.
* Prefixed properties are also removed because they are already added by {@link #getClientPropsWithPrefix(String, Set)}.
* This allows to set a custom property for a specific client alone if specified using a prefix, or for all
* when no prefix is used.
*
* @return a map with the custom properties
*/
private Map<String, Object> getClientCustomProps() {
final Map<String, Object> props = originals();
props.keySet().removeAll(CONFIG.names());
props.keySet().removeAll(ConsumerConfig.configNames());
props.keySet().removeAll(ProducerConfig.configNames());
props.keySet().removeAll(AdminClientConfig.configNames());
props.keySet().removeAll(originalsWithPrefix(CONSUMER_PREFIX, false).keySet());
props.keySet().removeAll(originalsWithPrefix(PRODUCER_PREFIX, false).keySet());
props.keySet().removeAll(originalsWithPrefix(ADMIN_CLIENT_PREFIX, false).keySet());
return props;
}

/**
* Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #KEY_SERDE_CLASS_CONFIG key Serde
* class}. This method is deprecated. Use {@link #defaultKeySerde()} method instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
import static org.apache.kafka.common.requests.IsolationLevel.READ_UNCOMMITTED;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
import static org.apache.kafka.test.StreamsTestUtils.minimalStreamsConfig;
Expand All @@ -66,7 +67,6 @@ public void setUp() {
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put("DUMMY", "dummy");
props.put("key.deserializer.encoding", "UTF8");
props.put("value.deserializer.encoding", "UTF-16");
streamsConfig = new StreamsConfig(props);
Expand All @@ -90,7 +90,6 @@ public void testGetProducerConfigs() {
final Map<String, Object> returnedProps = streamsConfig.getProducerConfigs(clientId);
assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), clientId + "-producer");
assertEquals(returnedProps.get(ProducerConfig.LINGER_MS_CONFIG), "100");
assertNull(returnedProps.get("DUMMY"));
}

@Test
Expand All @@ -101,7 +100,6 @@ public void testGetConsumerConfigs() {
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-consumer");
assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), groupId);
assertEquals(returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000");
assertNull(returnedProps.get("DUMMY"));
}

@Test
Expand Down Expand Up @@ -148,7 +146,6 @@ public void testGetRestoreConsumerConfigs() {
final Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId);
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-restore-consumer");
assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
assertNull(returnedProps.get("DUMMY"));
}

@Test
Expand Down Expand Up @@ -264,6 +261,37 @@ public void shouldSupportNonPrefixedProducerConfigs() {
assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
}

@Test
public void shouldForwardCustomConfigsWithNoPrefixToAllClients() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
props.put("custom.property.host", "host");
final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
final Map<String, Object> adminConfigs = streamsConfig.getAdminConfigs("clientId");
assertEquals("host", consumerConfigs.get("custom.property.host"));
assertEquals("host", restoreConsumerConfigs.get("custom.property.host"));
assertEquals("host", producerConfigs.get("custom.property.host"));
assertEquals("host", adminConfigs.get("custom.property.host"));
}

@Test
public void shouldOverrideNonPrefixedCustomConfigsWithPrefixedConfigs() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
props.put("custom.property.host", "host0");
props.put(consumerPrefix("custom.property.host"), "host1");
props.put(producerPrefix("custom.property.host"), "host2");
props.put(adminClientPrefix("custom.property.host"), "host3");
final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
final Map<String, Object> adminConfigs = streamsConfig.getAdminConfigs("clientId");
assertEquals("host1", consumerConfigs.get("custom.property.host"));
assertEquals("host1", restoreConsumerConfigs.get("custom.property.host"));
assertEquals("host2", producerConfigs.get("custom.property.host"));
assertEquals("host3", adminConfigs.get("custom.property.host"));
}

@Test
public void shouldSupportNonPrefixedAdminConfigs() {
props.put(AdminClientConfig.RETRIES_CONFIG, 10);
Expand Down

0 comments on commit cb93d76

Please sign in to comment.