Skip to content

Commit

Permalink
KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support…
Browse files Browse the repository at this point in the history
… for connector header, key, and value converters (apache#14309)

Reviewers:  Greg Harris <[email protected]>
  • Loading branch information
C0urante authored May 7, 2024
1 parent 21bf715 commit 05df104
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.ConverterConfig;
import org.apache.kafka.connect.storage.ConverterType;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.transforms.Transformation;
Expand Down Expand Up @@ -79,6 +81,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -90,6 +93,10 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.kafka.connect.runtime.ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;

/**
* Abstract Herder implementation which handles connector/task lifecycle tracking. Extensions
* must invoke the lifecycle hooks appropriately.
Expand Down Expand Up @@ -392,6 +399,161 @@ protected Map<String, ConfigValue> validateSourceConnectorConfig(SourceConnector
return configDef.validateAll(config);
}

/**
* General-purpose validation logic for converters that are configured directly
* in a connector config (as opposed to inherited from the worker config).
* @param connectorConfig the configuration for the connector; may not be null
* @param pluginConfigValue the {@link ConfigValue} for the converter property in the connector config;
* may be null, in which case no validation will be performed under the assumption that the
* connector will use inherit the converter settings from the worker. Some errors encountered
* during validation may be {@link ConfigValue#addErrorMessage(String) added} to this object
* @param pluginInterface the interface for the plugin type
* (e.g., {@code org.apache.kafka.connect.storage.Converter.class});
* may not be null
* @param configDefAccessor an accessor that can be used to retrieve a {@link ConfigDef}
* from an instance of the plugin type (e.g., {@code Converter::config});
* may not be null
* @param pluginName a lowercase, human-readable name for the type of plugin (e.g., {@code "key converter"});
* may not be null
* @param pluginProperty the property used to define a custom class for the plugin type
* in a connector config (e.g., {@link ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
* may not be null
* @param defaultProperties any default properties to include in the configuration that will be used for
* the plugin; may be null
* @return a {@link ConfigInfos} object containing validation results for the plugin in the connector config,
* or null if either no custom validation was performed (possibly because no custom plugin was defined in the
* connector config), or if custom validation failed
* @param <T> the plugin class to perform validation for
*/
private <T> ConfigInfos validateConverterConfig(
Map<String, String> connectorConfig,
ConfigValue pluginConfigValue,
Class<T> pluginInterface,
Function<T, ConfigDef> configDefAccessor,
String pluginName,
String pluginProperty,
Map<String, String> defaultProperties,
Function<String, TemporaryStage> reportStage
) {
Objects.requireNonNull(connectorConfig);
Objects.requireNonNull(pluginInterface);
Objects.requireNonNull(configDefAccessor);
Objects.requireNonNull(pluginName);
Objects.requireNonNull(pluginProperty);

String pluginClass = connectorConfig.get(pluginProperty);

if (pluginClass == null
|| pluginConfigValue == null
|| !pluginConfigValue.errorMessages().isEmpty()
) {
// Either no custom converter was specified, or one was specified but there's a problem with it.
// No need to proceed any further.
return null;
}

T pluginInstance;
String stageDescription = "instantiating the connector's " + pluginName + " for validation";
try (TemporaryStage stage = reportStage.apply(stageDescription)) {
pluginInstance = Utils.newInstance(pluginClass, pluginInterface);
} catch (ClassNotFoundException | RuntimeException e) {
log.error("Failed to instantiate {} class {}; this should have been caught by prior validation logic", pluginName, pluginClass, e);
pluginConfigValue.addErrorMessage("Failed to load class " + pluginClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
return null;
}

try {
ConfigDef configDef;
stageDescription = "retrieving the configuration definition from the connector's " + pluginName;
try (TemporaryStage stage = reportStage.apply(stageDescription)) {
configDef = configDefAccessor.apply(pluginInstance);
} catch (RuntimeException e) {
log.error("Failed to load ConfigDef from {} of type {}", pluginName, pluginClass, e);
pluginConfigValue.addErrorMessage("Failed to load ConfigDef from " + pluginName + (e.getMessage() != null ? ": " + e.getMessage() : ""));
return null;
}
if (configDef == null) {
log.warn("{}.config() has returned a null ConfigDef; no further preflight config validation for this converter will be performed", pluginClass);
// Older versions of Connect didn't do any converter validation.
// Even though converters are technically required to return a non-null ConfigDef object from their config() method,
// we permit this case in order to avoid breaking existing converters that, despite not adhering to this requirement,
// can be used successfully with a connector.
return null;
}
final String pluginPrefix = pluginProperty + ".";
Map<String, String> pluginConfig = Utils.entriesWithPrefix(connectorConfig, pluginPrefix);
if (defaultProperties != null)
defaultProperties.forEach(pluginConfig::putIfAbsent);

List<ConfigValue> configValues;
stageDescription = "performing config validation for the connector's " + pluginName;
try (TemporaryStage stage = reportStage.apply(stageDescription)) {
configValues = configDef.validate(pluginConfig);
} catch (RuntimeException e) {
log.error("Failed to perform config validation for {} of type {}", pluginName, pluginClass, e);
pluginConfigValue.addErrorMessage("Failed to perform config validation for " + pluginName + (e.getMessage() != null ? ": " + e.getMessage() : ""));
return null;
}

return prefixedConfigInfos(configDef.configKeys(), configValues, pluginPrefix);
} finally {
Utils.maybeCloseQuietly(pluginInstance, pluginName + " " + pluginClass);
}
}

private ConfigInfos validateHeaderConverterConfig(
Map<String, String> connectorConfig,
ConfigValue headerConverterConfigValue,
Function<String, TemporaryStage> reportStage
) {
return validateConverterConfig(
connectorConfig,
headerConverterConfigValue,
HeaderConverter.class,
HeaderConverter::config,
"header converter",
HEADER_CONVERTER_CLASS_CONFIG,
Collections.singletonMap(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName()),
reportStage
);
}

private ConfigInfos validateKeyConverterConfig(
Map<String, String> connectorConfig,
ConfigValue keyConverterConfigValue,
Function<String, TemporaryStage> reportStage
) {
return validateConverterConfig(
connectorConfig,
keyConverterConfigValue,
Converter.class,
Converter::config,
"key converter",
KEY_CONVERTER_CLASS_CONFIG,
Collections.singletonMap(ConverterConfig.TYPE_CONFIG, ConverterType.KEY.getName()),
reportStage
);
}

private ConfigInfos validateValueConverterConfig(
Map<String, String> connectorConfig,
ConfigValue valueConverterConfigValue,
Function<String, TemporaryStage> reportStage
) {
return validateConverterConfig(
connectorConfig,
valueConverterConfigValue,
Converter.class,
Converter::config,
"value converter",
VALUE_CONVERTER_CLASS_CONFIG,
Collections.singletonMap(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName()),
reportStage
);
}

@Override
public void validateConnectorConfig(Map<String, String> connectorProps, Callback<ConfigInfos> callback) {
validateConnectorConfig(connectorProps, callback, true);
Expand Down Expand Up @@ -562,8 +724,25 @@ ConfigInfos validateConnectorConfig(
configKeys.putAll(configDef.configKeys());
allGroups.addAll(configDef.groups());
configValues.addAll(config.configValues());
ConfigInfos configInfos = generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups));

// do custom converter-specific validation
ConfigInfos headerConverterConfigInfos = validateHeaderConverterConfig(
connectorProps,
validatedConnectorConfig.get(HEADER_CONVERTER_CLASS_CONFIG),
reportStage
);
ConfigInfos keyConverterConfigInfos = validateKeyConverterConfig(
connectorProps,
validatedConnectorConfig.get(KEY_CONVERTER_CLASS_CONFIG),
reportStage
);
ConfigInfos valueConverterConfigInfos = validateValueConverterConfig(
connectorProps,
validatedConnectorConfig.get(VALUE_CONVERTER_CLASS_CONFIG),
reportStage
);

ConfigInfos configInfos = generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups));
AbstractConfig connectorConfig = new AbstractConfig(new ConfigDef(), connectorProps, doLog);
String connName = connectorProps.get(ConnectorConfig.NAME_CONFIG);
ConfigInfos producerConfigInfos = null;
Expand Down Expand Up @@ -612,7 +791,15 @@ ConfigInfos validateConnectorConfig(
connectorClientConfigOverridePolicy);
}
}
return mergeConfigInfos(connType, configInfos, producerConfigInfos, consumerConfigInfos, adminConfigInfos);
return mergeConfigInfos(connType,
configInfos,
producerConfigInfos,
consumerConfigInfos,
adminConfigInfos,
headerConverterConfigInfos,
keyConverterConfigInfos,
valueConverterConfigInfos
);
}
}

Expand All @@ -638,10 +825,6 @@ private static ConfigInfos validateClientOverrides(String connName,
org.apache.kafka.connect.health.ConnectorType connectorType,
ConnectorClientConfigRequest.ClientType clientType,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
int errorCount = 0;
List<ConfigInfo> configInfoList = new LinkedList<>();
Map<String, ConfigKey> configKeys = configDef.configKeys();
Set<String> groups = new LinkedHashSet<>();
Map<String, Object> clientConfigs = new HashMap<>();
for (Map.Entry<String, Object> rawClientConfig : connectorConfig.originalsWithPrefix(prefix).entrySet()) {
String configName = rawClientConfig.getKey();
Expand All @@ -655,27 +838,38 @@ private static ConfigInfos validateClientOverrides(String connName,
ConnectorClientConfigRequest connectorClientConfigRequest = new ConnectorClientConfigRequest(
connName, connectorType, connectorClass, clientConfigs, clientType);
List<ConfigValue> configValues = connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest);
if (configValues != null) {
for (ConfigValue validatedConfigValue : configValues) {
ConfigKey configKey = configKeys.get(validatedConfigValue.name());
ConfigKeyInfo configKeyInfo = null;
if (configKey != null) {
if (configKey.group != null) {
groups.add(configKey.group);
}
configKeyInfo = convertConfigKey(configKey, prefix);
}

ConfigValue configValue = new ConfigValue(prefix + validatedConfigValue.name(), validatedConfigValue.value(),
validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages());
if (!configValue.errorMessages().isEmpty()) {
errorCount++;
return prefixedConfigInfos(configDef.configKeys(), configValues, prefix);
}

private static ConfigInfos prefixedConfigInfos(Map<String, ConfigKey> configKeys, List<ConfigValue> configValues, String prefix) {
int errorCount = 0;
Set<String> groups = new LinkedHashSet<>();
List<ConfigInfo> configInfos = new ArrayList<>();

if (configValues == null) {
return new ConfigInfos("", 0, new ArrayList<>(groups), configInfos);
}

for (ConfigValue validatedConfigValue : configValues) {
ConfigKey configKey = configKeys.get(validatedConfigValue.name());
ConfigKeyInfo configKeyInfo = null;
if (configKey != null) {
if (configKey.group != null) {
groups.add(configKey.group);
}
ConfigValueInfo configValueInfo = convertConfigValue(configValue, configKey != null ? configKey.type : null);
configInfoList.add(new ConfigInfo(configKeyInfo, configValueInfo));
configKeyInfo = convertConfigKey(configKey, prefix);
}

ConfigValue configValue = new ConfigValue(prefix + validatedConfigValue.name(), validatedConfigValue.value(),
validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages());
if (configValue.errorMessages().size() > 0) {
errorCount++;
}
ConfigValueInfo configValueInfo = convertConfigValue(configValue, configKey != null ? configKey.type : null);
configInfos.add(new ConfigInfo(configKeyInfo, configValueInfo));
}
return new ConfigInfos(connectorClass.toString(), errorCount, new ArrayList<>(groups), configInfoList);
return new ConfigInfos("", errorCount, new ArrayList<>(groups), configInfos);
}

// public for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,13 +529,13 @@ ConfigDef getConfigDefFromConfigProvidingClass(String key, Class<?> cls) {
}
Utils.ensureConcreteSubclass(baseClass, cls);

T transformation;
T pluginInstance;
try {
transformation = Utils.newInstance(cls, baseClass);
pluginInstance = Utils.newInstance(cls, baseClass);
} catch (Exception e) {
throw new ConfigException(key, String.valueOf(cls), "Error getting config definition from " + baseClass.getSimpleName() + ": " + e.getMessage());
}
ConfigDef configDef = config(transformation);
ConfigDef configDef = config(pluginInstance);
if (null == configDef) {
throw new ConnectException(
String.format(
Expand Down
Loading

0 comments on commit 05df104

Please sign in to comment.