From a0c4c3452514bbdaad4e7b54f4e228dd82cb63b5 Mon Sep 17 00:00:00 2001 From: Marko Strukelj Date: Fri, 7 Jul 2023 14:53:30 +0200 Subject: [PATCH] Remove the use of `metric.reporters` in OAuth metrics, use `strimzi.metric.reporters` instead (#193) * Remove the use of `metric.reporters` in OAuth metrics, use `strimzi.metric.reporters` instead. JmxReporter has to be listed explicitly to be instantiated. Signed-off-by: Marko Strukelj * Fix Matcher.match() to Mather.find(() Signed-off-by: Marko Strukelj * Use `strimzi.oauth.metric.reporters` as a config option name Signed-off-by: Marko Strukelj * Fix unused reporters ArrayList. Signed-off-by: Marko Strukelj * Addressed PR comments, implemented using JmxReporter by default Signed-off-by: Marko Strukelj * Shorten the Travis CI build by removing kafka-3.2.3 testsuite run from it. Address README comments. Signed-off-by: Marko Strukelj --------- Signed-off-by: Marko Strukelj --- .travis/build.sh | 11 +- README.md | 66 ++++++- .../kafka-oauth-authz-metrics-client.yaml | 2 + .../kafka-oauth-single-authz-metrics.yaml | 17 +- .../kafka/oauth/metrics/GlobalConfig.java | 17 ++ .../kafka/oauth/services/OAuthMetrics.java | 88 +++++++-- .../kafka/oauth/services/Services.java | 7 + .../testsuite/oauth/common/TestUtil.java | 27 +++ .../common}/metrics/TestMetricsReporter.java | 2 +- .../keycloak-auth-tests/docker-compose.yml | 19 +- .../testsuite/oauth/auth/BasicTests.java | 7 +- .../docker-compose.yml | 15 +- .../keycloak-authz-tests/docker-compose.yml | 15 +- .../docker-compose.yml | 15 +- testsuite/mockoauth-tests/docker-compose.yml | 13 ++ .../testsuite/oauth/MockOAuthTests.java | 2 +- .../testsuite/oauth/mockoauth/Common.java | 3 +- .../mockoauth/KeycloakAuthorizerTest.java | 182 ++++++------------ .../oauth/mockoauth/LogLineReader.java | 29 ++- .../{ => mockoauth}/metrics/MetricEntry.java | 2 +- .../{ => mockoauth}/metrics/Metrics.java | 2 +- .../{ => mockoauth}/metrics/MetricsTest.java | 134 ++++++++++++- 22 files changed, 491 insertions(+), 184 deletions(-) create mode 100644 oauth-common/src/main/java/io/strimzi/kafka/oauth/metrics/GlobalConfig.java rename testsuite/{keycloak-auth-tests/src/test/java/io/strimzi/testsuite/oauth/auth => common/src/main/java/io/strimzi/testsuite/oauth/common}/metrics/TestMetricsReporter.java (96%) rename testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/{ => mockoauth}/metrics/MetricEntry.java (88%) rename testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/{ => mockoauth}/metrics/Metrics.java (97%) rename testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/{ => mockoauth}/metrics/MetricsTest.java (53%) diff --git a/.travis/build.sh b/.travis/build.sh index 2448534a..0dd801df 100755 --- a/.travis/build.sh +++ b/.travis/build.sh @@ -74,13 +74,14 @@ elif [[ "$arch" != 'ppc64le' ]]; then EXIT=$? exitIfError - clearDockerEnv - mvn -e -V -B clean install -f testsuite -Pkafka-3_2_3 -DfailIfNoTests=false -Dtest=\!KeycloakKRaftAuthorizationTests - EXIT=$? - exitIfError - # Excluded by default to not exceed Travis job timeout if [ "$SKIP_DISABLED" == "false" ]; then + + clearDockerEnv + mvn -e -V -B clean install -f testsuite -Pkafka-3_2_3 -DfailIfNoTests=false -Dtest=\!KeycloakKRaftAuthorizationTests + EXIT=$? + exitIfError + clearDockerEnv mvn -e -V -B clean install -f testsuite -Pkafka-3_1_2 -DfailIfNoTests=false -Dtest=\!KeycloakKRaftAuthorizationTests,\!KeycloakZKAuthorizationTests EXIT=$? diff --git a/README.md b/README.md index 7dfa947f..fcb4e93b 100644 --- a/README.md +++ b/README.md @@ -1225,21 +1225,73 @@ Configuring the metrics By default, the gathering and exporting of metrics is disabled. Metrics are available to get an insight into the performance and failures during token validation, authorization operations and client authentication to the authorization server. You can also monitor the authorization server requests by background services such as refreshing of JWKS keys and refreshing of grants when `KeycloakAuthorizer` is used. -You can enable metrics for token validation on the Kafka broker or for client authentication on the client by setting the following JAAS option to `true`: +You can enable metrics for token validation, and `KeycloakAuthorizer` on the Kafka broker or for client authentication on the client by setting the following JAAS option to `true`: - `oauth.enable.metrics` (e.g.: "true") -You can enable metrics for `KeycloakAuthorizer` by setting an analogous option in Kafka broker's `server.properties` file: +You can also enable metrics only for `KeycloakAuthorizer` by setting an analogous option in Kafka broker's `server.properties` file: - `strimzi.authorization.enable.metrics` (e.g.: "true") -If `OAUTH_ENABLE_METRICS` env variable is set or if `oauth.enable.metrics` system property is set, that will both also enable the metrics for `KeycloakAuthorizer`. +If `OAUTH_ENABLE_METRICS` env variable is set or if `oauth.enable.metrics` system property is set, that will also enable the metrics for `KeycloakAuthorizer` (as well as for token validation, and client authentication). -If `oauth.config.id` is specified in JAAS configuration of the listener or the client, it will be available in MBean / metric name as `contextId` attribute. If not specified, it will be calculated from JAAS configuration for the validator or default to `client` in client JAAS config, or `keycloak-authorizer` for KeycloakAuthorizer metrics. +The OAuth metrics ignores the Kafka `metric.reporters` option in order to prevent automatically instantiating double instances of reporters. Most reporters may expect that they are singleton object and may not function properly in multiple copies. +Instead, there is `strimzi.oauth.metric.reporters` option where the reporters that support multiple copies can be specified for the purpose of metrics integration: +- `strimzi.oauth.metric.reporters` (e.g.: "org.apache.kafka.common.metrics.JmxReporter,org.some.package.SomeMetricReporter", use ',' as a separator to enable multiple reporters.) -Metrics are exposed through JMX managed beans. They can also be exposed as Prometheus metrics by using the Prometheus JMX Exporter agent, and mapping the JMX metrics names to prometheus metrics names. +If this configuration option is not set and OAuth metrics is enabled for some component, then a new instance of `org.apache.kafka.common.metrics.JmxReporter` will automatically be instantiated to provide JMX integration for OAuth metrics. +However, if `strimzi.oauth.metric.reporters` is set, then only the reporters specified in the list will be instantiated and integrated. Setting the option to an empty string will result in no reporters instantiated. -The OAuth metrics also honor the `metric.reporters`, `metrics.num.samples`, `metrics.recording.level` and `metrics.sample.window.ms` configurations of Kafka runtimes. When OAuth metrics are enabled the OAuth layer creates duplicate instances of `JmxReporter` and the configured `MetricReporter`s since at the moment there is no other way to integrate with the existing metrics system of Kafka runtimes. +The configuration option `strimzi.oauth.metric.reporters` on the Kafka broker has to be configured as an env variable or a system property. Using it on the Kafka broker inside `server.properties` does not work reliably due to multiple pluggability mechanisms that can be used (authorizer, authentication callback handler, inter-broker client). +Some of these mechanisms get the `server.properties` filtered so only configuration recognised by Kafka makes it through. However, this is a global OAuth Metrics configuration, and it is initialized on first use by any of the components, using the configuration provided to that component. +Specifically, the inter-broker client using OAUTHBEARER might be the first to trigger OAuth Metrics initialisation on the broker, and does not see this config option. -When OAuth metrics are enabled, managed beans are registered on demand, containing the attributes that are easily translated into Prometheus metrics. +In order to reliably configure `strimzi.oauth.metric.reporters` one of the following options should be used when starting a Kafka broker: +- `STRIMZI_OAUTH_METRIC_REPORTERS` env variable +- `strimzi.oauth.metric.reporters` env variable or system property + +At the moment there is no way to integrate with the existing Kafka metrics / reporters objects already instantiated in different Kafka runtimes (producer, consumer, broker, ...). +When OAuth metrics is enabled the OAuth layer has to create its own copies of metric reporters. + +NOTE: In OAuth versions preceding 0.13.0 the configuration option `metric.reporters` was used to configure reporters which were consequently automatically instantiated twice. +The configuration option `metric.reporters` is no longer used. + +The Kafka options that control sampling are honored: `metrics.num.samples`, `metrics.recording.level` and `metrics.sample.window.ms`. + +### Example for Kafka broker: +``` +# Enable OAuth metrics for all listeners, Keycloak authorizer, and inter-broker clients: +export OAUTH_ENABLE_METRICS=true +# Use a custom metric reporter rather than the default JmxReporter +export STRIMZI_OAUTH_METRIC_REPORTERS=org.some.package.SomeMetricReporter +bin/kafka-server-start.sh config/server.properties +``` + +### Example for Kafka client: +``` +# Show the content of client properties file +cat ~/client.properties +... +sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ +oauth.token.endpoint.uri="https://server/token-endpoint" oauth.client.id="clientId" oauth.client.secret="client-secret" oauth.enable.metrics="true"; + +strimzi.oauth.metric.reporters=org.some.package.SomeMetricReporter +... + +# Start the client +bin/kafka-console-producer.sh --broker-list kafka:9092 --topic my-topic --producer.config=$HOME/client.properties +``` + +### Simplest example for Kafka broker: +``` +# Enable OAuth metrics for all listeners, Keycloak authorizer, and inter-broker clients: +# With no 'strimzi.oauth.metric.reporters' specified 'org.apache.kafka.common.metrics.JmxReporter' will be used automatically +export OAUTH_ENABLE_METRICS=true +bin/kafka-server-start.sh config/server.properties +``` + +A common use-case is for metrics to be exposed through JMX managed beans. They can then also be exposed as Prometheus metrics by using the Prometheus JMX Exporter agent, and mapping the JMX metrics names to prometheus metrics names. +If `oauth.config.id` is specified in JAAS configuration of the listener or the client, it will be available in MBean / metric name as `contextId` attribute. If not specified, it will be calculated from JAAS configuration for the validator or default to `client` in client JAAS config, or `keycloak-authorizer` for `KeycloakAuthorizer` metrics. + +When `JmxReporter` is enabled, managed beans are registered on demand, containing the attributes that are easily translated into Prometheus metrics. Each registered MBean contains two counter variables - `count`, and `totalTimeMs`. It also contains three gauge variables - `minTimeMs`, `maxTimeMs` and `avgTimeMs`. These are measured within the configured sample time window. diff --git a/examples/kubernetes/kafka-oauth-authz-metrics-client.yaml b/examples/kubernetes/kafka-oauth-authz-metrics-client.yaml index 554ed1c9..bb3e14ec 100644 --- a/examples/kubernetes/kafka-oauth-authz-metrics-client.yaml +++ b/examples/kubernetes/kafka-oauth-authz-metrics-client.yaml @@ -46,6 +46,8 @@ spec: env: - name: OAUTH_ENABLE_METRICS value: "true" + - name: STRIMZI_OAUTH_METRIC_REPORTERS + value: org.apache.kafka.common.metrics.JmxReporter - name: SECRET valueFrom: secretKeyRef: diff --git a/examples/kubernetes/kafka-oauth-single-authz-metrics.yaml b/examples/kubernetes/kafka-oauth-single-authz-metrics.yaml index f52dad79..4c267f81 100644 --- a/examples/kubernetes/kafka-oauth-single-authz-metrics.yaml +++ b/examples/kubernetes/kafka-oauth-single-authz-metrics.yaml @@ -5,7 +5,7 @@ metadata: name: my-cluster spec: kafka: - version: 3.3.2 + version: 3.4.0 replicas: 1 listeners: - name: plain @@ -68,12 +68,14 @@ spec: env: - name: OAUTH_ENABLE_METRICS value: "true" - # - name: KAFKA_DEBUG - # value: "y" - # - name: DEBUG_SUSPEND_FLAG - # value: "y" - # - name: JAVA_DEBUG_PORT - # value: "5005" + - name: STRIMZI_OAUTH_METRIC_REPORTERS + value: org.apache.kafka.common.metrics.JmxReporter + #- name: KAFKA_DEBUG + # value: "y" + #- name: DEBUG_SUSPEND_FLAG + # value: "n" + #- name: JAVA_DEBUG_PORT + # value: "5005" jmxOptions: {} zookeeper: @@ -94,6 +96,7 @@ spec: entityOperator: topicOperator: {} userOperator: {} + --- kind: ConfigMap apiVersion: v1 diff --git a/oauth-common/src/main/java/io/strimzi/kafka/oauth/metrics/GlobalConfig.java b/oauth-common/src/main/java/io/strimzi/kafka/oauth/metrics/GlobalConfig.java new file mode 100644 index 00000000..cc25d729 --- /dev/null +++ b/oauth-common/src/main/java/io/strimzi/kafka/oauth/metrics/GlobalConfig.java @@ -0,0 +1,17 @@ +/* + * Copyright 2017-2023, Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.oauth.metrics; + +import io.strimzi.kafka.oauth.common.Config; + +/** + * Configuration that can be specified as ENV vars, System properties or in server.properties configuration file, + * but not as part of the JAAS configuration. + */ +public class GlobalConfig extends Config { + + /** The name of the 'strimzi.oauth.metric.reporters' config option */ + public static final String STRIMZI_OAUTH_METRIC_REPORTERS = "strimzi.oauth.metric.reporters"; +} diff --git a/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/OAuthMetrics.java b/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/OAuthMetrics.java index 26426076..7daa7589 100644 --- a/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/OAuthMetrics.java +++ b/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/OAuthMetrics.java @@ -24,13 +24,16 @@ import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import static io.strimzi.kafka.oauth.metrics.GlobalConfig.STRIMZI_OAUTH_METRIC_REPORTERS; import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; import static org.apache.kafka.clients.CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG; import static org.apache.kafka.clients.CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG; @@ -38,9 +41,30 @@ /** * The singleton for handling a cache of all the Sensors to prevent unnecessary redundant re-registrations. - * There is a one-to-one mapping between a SensorKey and a Sensor, and one-to-one mapping between a Sensor and an MBean name. - * - * MBeans are registered as requested by JmxReporter attached to the Metrics object. + * There is a one-to-one mapping between a SensorKey and a Sensor, and one-to-one mapping between a Sensor and an MBean name. + *

+ * MBeans are registered as requested by JmxReporter attached to the Metrics object. + * The JmxReporter either has to be explicitly configured using a config option strimzi.oauth.metric.reporters, + * or if that config option si not set, a new instance is configured by default. + *

+ * Since OAuth instantiates its own Metrics object it also has to instantiate reporters to attach them to this Metrics object. + * To prevent double instantiation of MetricReporter objects that require to be singleton, all MetricReporter objects + * to be integrated with OAuthMetrics have to be separately instantiated. + *

+ * Example 1: + *

+ *    strimzi.oauth.metric.reporters=org.apache.kafka.common.metrics.JmxReporter,org.some.package.SomeMetricReporter
+ * 
+ * The above will instantiate and integrate with OAuth metrics the JmxReporter instance, and a SomeMetricReporter instance. + *

+ * Example 2: + *

+ *    strimzi.oauth.metric.reporters=
+ * 
+ * The above will not instantiate and integrate any metric reporters with OAuth metrics, not even JmxReporter. + *

+ * Note: On the Kafka broker it is best to use STRIMZI_OAUTH_METRIC_REPORTERS env variable or strimzi.oauth.metric.reporters system property, + * rather than a `server.properties` global configuration option. */ public class OAuthMetrics { @@ -57,9 +81,14 @@ public class OAuthMetrics { * * @param configMap Configuration properties */ + @SuppressWarnings("unchecked") OAuthMetrics(Map configMap) { - this.configMap = configMap; this.config = new Config(configMap); + + // Make sure to add the resolved 'strimzi.oauth.metric.reporters' configuration to the config map + ((Map) configMap).put(STRIMZI_OAUTH_METRIC_REPORTERS, config.getValue(STRIMZI_OAUTH_METRIC_REPORTERS)); + this.configMap = configMap; + this.metrics = initKafkaMetrics(); } @@ -90,26 +119,47 @@ private Metrics initKafkaMetrics() { private List initReporters() { AbstractConfig kafkaConfig = initKafkaConfig(); - List reporters = kafkaConfig.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, - MetricsReporter.class); - + if (configMap.get(STRIMZI_OAUTH_METRIC_REPORTERS) != null) { + return kafkaConfig.getConfiguredInstances(STRIMZI_OAUTH_METRIC_REPORTERS, MetricsReporter.class); + } JmxReporter reporter = new JmxReporter(); reporter.configure(configMap); - - reporters.add(reporter); - return reporters; + return Collections.singletonList(reporter); } private AbstractConfig initKafkaConfig() { - ConfigDef configDef = new ConfigDef() - .define(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, - ConfigDef.Type.LIST, - Collections.emptyList(), - new ConfigDef.NonNullValidator(), - ConfigDef.Importance.LOW, - CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC); - - return new AbstractConfig(configDef, configMap); + ConfigDef configDef = addMetricReporterToConfigDef(new ConfigDef(), STRIMZI_OAUTH_METRIC_REPORTERS); + return new AbstractConfig(configDef, toMapOfStringValues(configMap)); + } + + private ConfigDef addMetricReporterToConfigDef(ConfigDef configDef, String name) { + return configDef.define(name, + ConfigDef.Type.LIST, + Collections.emptyList(), + new ConfigDef.NonNullValidator(), + ConfigDef.Importance.LOW, + CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC); + } + + private Map toMapOfStringValues(Map configMap) { + HashMap result = new HashMap<>(); + for (Map.Entry ent: configMap.entrySet()) { + Object val = ent.getValue(); + if (val == null) { + continue; + } + if (val instanceof Class) { + result.put(ent.getKey(), ((Class) val).getCanonicalName()); + } else if (val instanceof List) { + String stringVal = ((List) val).stream().map(String::valueOf).collect(Collectors.joining(",")); + if (!stringVal.isEmpty()) { + result.put(ent.getKey(), stringVal); + } + } else { + result.put(ent.getKey(), String.valueOf(ent.getValue())); + } + } + return result; } private KafkaMetricsContext createKafkaMetricsContext() { diff --git a/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/Services.java b/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/Services.java index 0310e579..b42721b3 100644 --- a/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/Services.java +++ b/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/Services.java @@ -39,6 +39,13 @@ public static synchronized void configure(Map configs) { } } + /** + * Close any configured Services so they can be reinitialised again + */ + public static synchronized void close() { + services = null; + } + /** * Get a configured singleton instance * diff --git a/testsuite/common/src/main/java/io/strimzi/testsuite/oauth/common/TestUtil.java b/testsuite/common/src/main/java/io/strimzi/testsuite/oauth/common/TestUtil.java index d90e06fc..a2764653 100644 --- a/testsuite/common/src/main/java/io/strimzi/testsuite/oauth/common/TestUtil.java +++ b/testsuite/common/src/main/java/io/strimzi/testsuite/oauth/common/TestUtil.java @@ -93,4 +93,31 @@ public static void logStart(String msg) { System.out.println("======== " + msg); System.out.println(); } + + public static int findFirstMatchingInLog(List log, String regex) { + int lineNum = 0; + Pattern pattern = Pattern.compile(regex); + for (String line: log) { + if (pattern.matcher(line).find()) { + return lineNum; + } + lineNum++; + } + return -1; + } + + public static boolean checkLogForRegex(List log, String regex) { + return findFirstMatchingInLog(log, regex) != -1; + } + + public static int countLogForRegex(List log, String regex) { + int count = 0; + Pattern pattern = Pattern.compile(regex); + for (String line: log) { + if (pattern.matcher(line).find()) { + count += 1; + } + } + return count; + } } diff --git a/testsuite/keycloak-auth-tests/src/test/java/io/strimzi/testsuite/oauth/auth/metrics/TestMetricsReporter.java b/testsuite/common/src/main/java/io/strimzi/testsuite/oauth/common/metrics/TestMetricsReporter.java similarity index 96% rename from testsuite/keycloak-auth-tests/src/test/java/io/strimzi/testsuite/oauth/auth/metrics/TestMetricsReporter.java rename to testsuite/common/src/main/java/io/strimzi/testsuite/oauth/common/metrics/TestMetricsReporter.java index 3c11e91d..898c84fd 100644 --- a/testsuite/keycloak-auth-tests/src/test/java/io/strimzi/testsuite/oauth/auth/metrics/TestMetricsReporter.java +++ b/testsuite/common/src/main/java/io/strimzi/testsuite/oauth/common/metrics/TestMetricsReporter.java @@ -2,7 +2,7 @@ * Copyright 2017-2022, Strimzi authors. * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). */ -package io.strimzi.testsuite.oauth.auth.metrics; +package io.strimzi.testsuite.oauth.common.metrics; import org.apache.kafka.common.metrics.KafkaMetric; import org.slf4j.Logger; diff --git a/testsuite/keycloak-auth-tests/docker-compose.yml b/testsuite/keycloak-auth-tests/docker-compose.yml index fb68f240..6694e43e 100644 --- a/testsuite/keycloak-auth-tests/docker-compose.yml +++ b/testsuite/keycloak-auth-tests/docker-compose.yml @@ -45,7 +45,7 @@ services: - "5006:5006" volumes: - ${PWD}/../docker/target/kafka/libs:/opt/kafka/libs/strimzi - - ${PWD}/target/test-classes:/opt/kafka/libs/strimzi/reporters + - ${PWD}/../common/target/classes:/opt/kafka/libs/strimzi/reporters - ${PWD}/../docker/kafka/config:/opt/kafka/config/strimzi - ${PWD}/../docker/kafka/scripts:/opt/kafka/strimzi command: @@ -66,9 +66,6 @@ services: - KAFKA_INTER_BROKER_LISTENER_NAME=INTROSPECT - KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=OAUTHBEARER - # Common settings for all the listeners - - OAUTH_ENABLE_METRICS=true - # username extraction from JWT token claim - OAUTH_USERNAME_CLAIM=preferred_username - KAFKA_PRINCIPAL_BUILDER_CLASS=io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder @@ -129,12 +126,24 @@ services: - KAFKA_LISTENER_NAME_FORGE_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler # Test Metrics Reporters - - KAFKA_METRIC_REPORTERS=io.strimzi.testsuite.oauth.auth.metrics.TestMetricsReporter - KAFKA_METRICS_CONTEXT_TEST_LABEL=testvalue - KAFKA_METRICS_NUM_SAMPLES=3 - KAFKA_METRICS_RECORDING_LEVEL=DEBUG - KAFKA_METRICS_SAMPLE_WINDOW_MS=15000 + + # OAuth metrics configuration + + - OAUTH_ENABLE_METRICS=true + # When enabling metrics we also have to explicitly configure JmxReporter to have metrics available in JMX + # The following value will be available as env var STRIMZI_OAUTH_METRIC_REPORTERS + - STRIMZI_OAUTH_METRIC_REPORTERS=org.apache.kafka.common.metrics.JmxReporter,io.strimzi.testsuite.oauth.common.metrics.TestMetricsReporter + + # The following value will turn to 'strimzi.oauth.metric.reporters=...' in 'strimzi.properties' file + # However, that won't work as the value may be filtered to the component that happens to initialise OAuthMetrics + #- KAFKA_STRIMZI_OAUTH_METRIC_REPORTERS=org.apache.kafka.common.metrics.JmxReporter + + # For start.sh script to know where the keycloak is listening - KEYCLOAK_HOST=${KEYCLOAK_HOST:-keycloak} - REALM=${REALM:-forge} diff --git a/testsuite/keycloak-auth-tests/src/test/java/io/strimzi/testsuite/oauth/auth/BasicTests.java b/testsuite/keycloak-auth-tests/src/test/java/io/strimzi/testsuite/oauth/auth/BasicTests.java index 09ae1aa8..6f7c6dd1 100644 --- a/testsuite/keycloak-auth-tests/src/test/java/io/strimzi/testsuite/oauth/auth/BasicTests.java +++ b/testsuite/keycloak-auth-tests/src/test/java/io/strimzi/testsuite/oauth/auth/BasicTests.java @@ -62,12 +62,11 @@ public void doTests() throws Exception { void oauthMetricsConfigIntegration() { System.out.println(" ==== KeycloakAuthenticationTest :: oauthMetricsConfigIntegrationTest"); - // Test MetricReporter config works as expected + // Test that MetricReporter config works as expected // Get kafka log and make sure the TestMetricReporter was initialised exactly twice List lines = getContainerLogsForString(kafkaContainer, "TestMetricsReporter no. "); - Assert.assertEquals("Kafka log should contain: \"TestMetricsReporter no. \" exactly twice", 2, lines.size()); + Assert.assertEquals("Kafka log should contain: \"TestMetricsReporter no. \" exactly once", 1, lines.size()); Assert.assertTrue("Contains \"TestMetricsReporter no. 1\"", lines.get(0).contains("TestMetricsReporter no. 1 ")); - Assert.assertTrue("Contains \"TestMetricsReporter no. 2\"", lines.get(1).contains("TestMetricsReporter no. 2 ")); // Ensure the configuration was applied as expected lines = getContainerLogsForString(kafkaContainer, "Creating Metrics:"); @@ -82,7 +81,7 @@ void oauthMetricsConfigIntegration() { Assert.assertTrue("kafka.broker.id=1", line.contains("kafka.broker.id=1")); line = lines.get(3); - Assert.assertTrue("io.strimzi.testsuite.oauth.auth.metrics.TestMetricsReporter", line.contains("io.strimzi.testsuite.oauth.auth.metrics.TestMetricsReporter")); + Assert.assertTrue("io.strimzi.testsuite.oauth.common.metrics.TestMetricsReporter", line.contains("io.strimzi.testsuite.oauth.common.metrics.TestMetricsReporter")); Assert.assertTrue("org.apache.kafka.common.metrics.JmxReporter", line.contains("org.apache.kafka.common.metrics.JmxReporter")); } diff --git a/testsuite/keycloak-authz-kraft-tests/docker-compose.yml b/testsuite/keycloak-authz-kraft-tests/docker-compose.yml index 31991b79..9441515b 100644 --- a/testsuite/keycloak-authz-kraft-tests/docker-compose.yml +++ b/testsuite/keycloak-authz-kraft-tests/docker-compose.yml @@ -74,8 +74,6 @@ services: - OAUTH_USERNAME_CLAIM=preferred_username - OAUTH_CONNECT_TIMEOUT_SECONDS=20 - - OAUTH_ENABLE_METRICS=true - # Configuration of individual listeners - KAFKA_LISTENER_NAME_CONTROLLER_SASL_ENABLED_MECHANISMS=PLAIN - KAFKA_LISTENER_NAME_CONTROLLER_PLAIN_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-password\" user_admin=\"admin-password\" user_bobby=\"bobby-secret\" ; @@ -134,6 +132,19 @@ services: - KAFKA_SUPER_USERS=User:admin;User:service-account-kafka + + # OAuth metrics configuration + + - OAUTH_ENABLE_METRICS=true + # When enabling metrics we also have to explicitly configure JmxReporter to have metrics available in JMX + # The following value will be available as env var STRIMZI_OAUTH_METRIC_REPORTERS + - STRIMZI_OAUTH_METRIC_REPORTERS=org.apache.kafka.common.metrics.JmxReporter + + # The following value will turn to 'strimzi.oauth.metric.reporters=...' in 'strimzi.properties' file + # However, that won't work as the value may be filtered to the component that happens to initialise OAuthMetrics + #- KAFKA_STRIMZI_OAUTH_METRIC_REPORTERS=org.apache.kafka.common.metrics.JmxReporter + + # Other configuration - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 diff --git a/testsuite/keycloak-authz-tests/docker-compose.yml b/testsuite/keycloak-authz-tests/docker-compose.yml index c376e750..b4c510fb 100644 --- a/testsuite/keycloak-authz-tests/docker-compose.yml +++ b/testsuite/keycloak-authz-tests/docker-compose.yml @@ -68,8 +68,6 @@ services: - OAUTH_USERNAME_CLAIM=preferred_username - OAUTH_CONNECT_TIMEOUT_SECONDS=20 - - OAUTH_ENABLE_METRICS=true - # Configuration of individual listeners - KAFKA_LISTENER_NAME_JWT_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required oauth.jwks.endpoint.uri=\"http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/certs\" oauth.valid.issuer.uri=\"http://keycloak:8080/auth/realms/kafka-authz\" oauth.token.endpoint.uri=\"http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/token\" oauth.client.id=\"kafka\" oauth.client.secret=\"kafka-secret\" oauth.groups.claim=\"$$.realm_access.roles\" ; - KAFKA_LISTENER_NAME_JWT_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler @@ -122,6 +120,19 @@ services: - KAFKA_SUPER_USERS=User:admin;User:service-account-kafka + + # OAuth metrics configuration + + - OAUTH_ENABLE_METRICS=true + # When enabling metrics we also have to explicitly configure JmxReporter to have metrics available in JMX + # The following value will be available as env var STRIMZI_OAUTH_METRIC_REPORTERS + - STRIMZI_OAUTH_METRIC_REPORTERS=org.apache.kafka.common.metrics.JmxReporter + + # The following value will turn to 'strimzi.oauth.metric.reporters=...' in 'strimzi.properties' file + # However, that won't work as the value may be filtered to the component that happens to initialise OAuthMetrics + #- KAFKA_STRIMZI_OAUTH_METRIC_REPORTERS=org.apache.kafka.common.metrics.JmxReporter + + # Other configuration - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 diff --git a/testsuite/keycloak-authz-zk-tests/docker-compose.yml b/testsuite/keycloak-authz-zk-tests/docker-compose.yml index 72918760..1d8516b2 100644 --- a/testsuite/keycloak-authz-zk-tests/docker-compose.yml +++ b/testsuite/keycloak-authz-zk-tests/docker-compose.yml @@ -68,8 +68,6 @@ services: - OAUTH_USERNAME_CLAIM=preferred_username - OAUTH_CONNECT_TIMEOUT_SECONDS=20 - - OAUTH_ENABLE_METRICS=true - # Configuration of individual listeners - KAFKA_LISTENER_NAME_JWT_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required oauth.jwks.endpoint.uri=\"http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/certs\" oauth.valid.issuer.uri=\"http://keycloak:8080/auth/realms/kafka-authz\" oauth.token.endpoint.uri=\"http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/token\" oauth.client.id=\"kafka\" oauth.client.secret=\"kafka-secret\" oauth.groups.claim=\"$$.realm_access.roles\" ; - KAFKA_LISTENER_NAME_JWT_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler @@ -126,6 +124,19 @@ services: - KAFKA_SUPER_USERS=User:admin;User:service-account-kafka + + # OAuth metrics configuration + + - OAUTH_ENABLE_METRICS=true + # When enabling metrics we also have to explicitly configure JmxReporter to have metrics available in JMX + # The following value will be available as env var STRIMZI_OAUTH_METRIC_REPORTERS + - STRIMZI_OAUTH_METRIC_REPORTERS=org.apache.kafka.common.metrics.JmxReporter + + # The following value will turn to 'strimzi.oauth.metric.reporters=...' in 'strimzi.properties' file + # However, that won't work as the value may be filtered to the component that happens to initialise OAuthMetrics + #- KAFKA_STRIMZI_OAUTH_METRIC_REPORTERS=org.apache.kafka.common.metrics.JmxReporter + + # Other configuration - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 diff --git a/testsuite/mockoauth-tests/docker-compose.yml b/testsuite/mockoauth-tests/docker-compose.yml index 5e2d99b9..71f26ac3 100644 --- a/testsuite/mockoauth-tests/docker-compose.yml +++ b/testsuite/mockoauth-tests/docker-compose.yml @@ -34,6 +34,8 @@ services: - "9097:9097" - "9098:9098" - "9404:9404" + + # Debug port - "5006:5006" volumes: - ${PWD}/../docker/target/kafka/libs:/opt/kafka/libs/strimzi @@ -104,7 +106,18 @@ services: - OAUTH_SSL_TRUSTSTORE_TYPE=pkcs12 - OAUTH_CONNECT_TIMEOUT_SECONDS=10 - OAUTH_READ_TIMEOUT_SECONDS=10 + + + # OAuth metrics configuration + - OAUTH_ENABLE_METRICS=true + # When enabling metrics we also have to explicitly configure JmxReporter to have metrics available in JMX + # The following value will be available as env var STRIMZI_OAUTH_METRIC_REPORTERS + - STRIMZI_OAUTH_METRIC_REPORTERS=org.apache.kafka.common.metrics.JmxReporter + + # The following value will turn to 'strimzi.oauth.metric.reporters=...' in 'strimzi.properties' file + # However, that won't work as the value may be filtered to the component that happens to initialise OAuthMetrics + #- KAFKA_STRIMZI_OAUTH_METRIC_REPORTERS=org.apache.kafka.common.metrics.JmxReporter zookeeper: image: ${KAFKA_DOCKER_IMAGE} diff --git a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/MockOAuthTests.java b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/MockOAuthTests.java index a1b0ae0c..591bd948 100644 --- a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/MockOAuthTests.java +++ b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/MockOAuthTests.java @@ -6,7 +6,7 @@ import io.strimzi.testsuite.oauth.common.TestContainersLogCollector; import io.strimzi.testsuite.oauth.common.TestContainersWatcher; -import io.strimzi.testsuite.oauth.metrics.MetricsTest; +import io.strimzi.testsuite.oauth.mockoauth.metrics.MetricsTest; import io.strimzi.testsuite.oauth.mockoauth.ConnectTimeoutTests; import io.strimzi.testsuite.oauth.mockoauth.JWKSKeyUseTest; import io.strimzi.testsuite.oauth.mockoauth.JaasClientConfigTest; diff --git a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/Common.java b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/Common.java index 73d35771..4801ba8d 100644 --- a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/Common.java +++ b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/Common.java @@ -13,7 +13,7 @@ import io.strimzi.kafka.oauth.common.SSLUtil; import io.strimzi.kafka.oauth.common.TimeUtil; import io.strimzi.kafka.oauth.common.TokenInfo; -import io.strimzi.testsuite.oauth.metrics.Metrics; +import io.strimzi.testsuite.oauth.mockoauth.metrics.Metrics; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.Assert; @@ -35,6 +35,7 @@ public class Common { static final String WWW_FORM_CONTENT_TYPE = "application/x-www-form-urlencoded"; + public static final String LOG_PATH = "target/test.log"; static String getJaasConfigOptionsString(Map options) { StringBuilder sb = new StringBuilder(); diff --git a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/KeycloakAuthorizerTest.java b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/KeycloakAuthorizerTest.java index 040c8a24..d38b9bdf 100644 --- a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/KeycloakAuthorizerTest.java +++ b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/KeycloakAuthorizerTest.java @@ -54,9 +54,9 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; -import java.util.regex.Pattern; import java.util.stream.Collectors; +import static io.strimzi.testsuite.oauth.common.TestUtil.checkLogForRegex; import static io.strimzi.testsuite.oauth.mockoauth.Common.addGrantsForToken; import static io.strimzi.testsuite.oauth.mockoauth.Common.changeAuthServerMode; import static io.strimzi.testsuite.oauth.mockoauth.Common.createOAuthClient; @@ -71,8 +71,6 @@ public class KeycloakAuthorizerTest { static final int LOOP_PAUSE_MS = 1000; static final int TIMEOUT_SECONDS = 30; - static final String LOG_PATH = "target/test.log"; - static final String CLIENT_CLI = "kafka-cli"; static final String USER_ALICE = "alice"; @@ -121,7 +119,7 @@ void doGrants401Test() throws IOException, InterruptedException, TimeoutExceptio changeAuthServerMode("token", "MODE_200"); - LogLineReader logReader = new LogLineReader(LOG_PATH); + LogLineReader logReader = new LogLineReader(Common.LOG_PATH); logReader.readNext(); List lines; @@ -166,7 +164,7 @@ void doGrants401Test() throws IOException, InterruptedException, TimeoutExceptio changeAuthServerMode("grants", "MODE_401"); LOG.info("Waiting for: Done refreshing"); // Make sure to not repeat the below condition in the string here - lines = waitFor(logReader, "Done refreshing grants"); + lines = logReader.waitFor("Done refreshing grants"); Assert.assertTrue("Failed to fetch", checkLogForRegex(lines, "Failed to fetch grants .* status 401")); Assert.assertTrue("Removed user from grants cache", checkLogForRegex(lines, "Removed user from grants cache: alice")); Assert.assertTrue("Removed invalid session", checkLogForRegex(lines, "Removed invalid session from sessions map \\(userId: alice")); @@ -184,7 +182,7 @@ void doGrants403Test() throws IOException { // Switch grants endpoint to 403 mode changeAuthServerMode("grants", "MODE_403"); - LogLineReader logReader = new LogLineReader(LOG_PATH); + LogLineReader logReader = new LogLineReader(Common.LOG_PATH); logReader.readNext(); List lines; @@ -298,7 +296,7 @@ private void runConcurrentFetchGrantsTest(HashMap props, boolean try (KeycloakAuthorizer authorizer = new KeycloakAuthorizer()) { authorizer.configure(props); - LogLineReader logReader = new LogLineReader(LOG_PATH); + LogLineReader logReader = new LogLineReader(Common.LOG_PATH); List lines = logReader.readNext(); if (withReuse) { @@ -334,9 +332,9 @@ private void runConcurrentFetchGrantsTest(HashMap props, boolean // It's the same whether reuseGrants is true or false - because concurrent requests are perceived by user to occur at the same time lines = logReader.readNext(); - Assert.assertEquals("One thread fetches grants", 1, countLogForRegex(lines, "Fetching grants from Keycloak for user user1")); - Assert.assertEquals("One thread waits", 1, countLogForRegex(lines, "Waiting on another thread to get grants")); - Assert.assertEquals("One grants fetch", 1, countLogForRegex(lines, "Response body for POST https://mockoauth:8090/grants")); + Assert.assertEquals("One thread fetches grants", 1, TestUtil.countLogForRegex(lines, "Fetching grants from Keycloak for user user1")); + Assert.assertEquals("One thread waits", 1, TestUtil.countLogForRegex(lines, "Waiting on another thread to get grants")); + Assert.assertEquals("One grants fetch", 1, TestUtil.countLogForRegex(lines, "Response body for POST https://mockoauth:8090/grants")); // Check the authorization result Assert.assertEquals("One result for my-topic action", 1, result.size()); @@ -359,7 +357,7 @@ private void runConcurrentFetchGrantsTest(HashMap props, boolean // check log from last checkpoint on lines = logReader.readNext(); - Assert.assertEquals("No grants fetch", 0, countLogForRegex(lines, "Response body for POST https://mockoauth:8090/grants")); + Assert.assertEquals("No grants fetch", 0, TestUtil.countLogForRegex(lines, "Response body for POST https://mockoauth:8090/grants")); // Check the authorization result Assert.assertEquals("One result for x-topic-1 action", 1, result.size()); @@ -376,10 +374,10 @@ private void runConcurrentFetchGrantsTest(HashMap props, boolean lines = logReader.readNext(); if (!withReuse) { // Check that grants have been fetched - Assert.assertEquals("Grants fetched", 1, countLogForRegex(lines, "Response body for POST https://mockoauth:8090/grants")); + Assert.assertEquals("Grants fetched", 1, TestUtil.countLogForRegex(lines, "Response body for POST https://mockoauth:8090/grants")); } else { // Check that grants have not been fetched again - Assert.assertEquals("Grants not fetched", 0, countLogForRegex(lines, "Response body for POST https://mockoauth:8090/grants")); + Assert.assertEquals("Grants not fetched", 0, TestUtil.countLogForRegex(lines, "Response body for POST https://mockoauth:8090/grants")); } // Check the authorization result @@ -428,7 +426,7 @@ void doConfigTests() throws IOException { } config.put(AuthzConfig.STRIMZI_AUTHORIZATION_CLIENT_ID, "kafka"); - LogLineReader logReader = new LogLineReader(LOG_PATH); + LogLineReader logReader = new LogLineReader(Common.LOG_PATH); // Position to the end of the existing log file logReader.readNext(); @@ -440,22 +438,22 @@ void doConfigTests() throws IOException { List lines = logReader.readNext(); // Check the defaults - Assert.assertEquals("tokenEndpointUri: https://mockoauth:8090/grants", 1, countLogForRegex(lines, "tokenEndpointUri: https://mockoauth:8090/grants")); - Assert.assertEquals("clientId: kafka", 1, countLogForRegex(lines, "clientId: kafka")); - Assert.assertEquals("sslSocketFactory: null", 1, countLogForRegex(lines, "sslSocketFactory: null")); - Assert.assertEquals("hostnameVerifier: null", 1, countLogForRegex(lines, "hostnameVerifier: null")); - Assert.assertEquals("clusterName: kafka-cluster", 1, countLogForRegex(lines, "clusterName: kafka-cluster")); - Assert.assertEquals("delegateToKafkaACL: false", 1, countLogForRegex(lines, "delegateToKafkaACL: false")); - Assert.assertEquals("superUsers: []", 1, countLogForRegex(lines, "superUsers: \\[\\]")); - Assert.assertEquals("grantsRefreshPeriodSeconds: 60", 1, countLogForRegex(lines, "grantsRefreshPeriodSeconds: 60")); - Assert.assertEquals("grantsRefreshPoolSize: 5", 1, countLogForRegex(lines, "grantsRefreshPoolSize: 5")); - Assert.assertEquals("grantsMaxIdleTimeSeconds: 300", 1, countLogForRegex(lines, "grantsMaxIdleTimeSeconds: 300")); - Assert.assertEquals("httpRetries: 0", 1, countLogForRegex(lines, "httpRetries: 0")); - Assert.assertEquals("reuseGrants: true", 1, countLogForRegex(lines, "reuseGrants: true")); - Assert.assertEquals("connectTimeoutSeconds: 60", 1, countLogForRegex(lines, "connectTimeoutSeconds: 60")); - Assert.assertEquals("readTimeoutSeconds: 60", 1, countLogForRegex(lines, "readTimeoutSeconds: 60")); - Assert.assertEquals("enableMetrics: false", 1, countLogForRegex(lines, "enableMetrics: false")); - Assert.assertEquals("gcPeriodSeconds: 300", 1, countLogForRegex(lines, "gcPeriodSeconds: 300")); + Assert.assertEquals("tokenEndpointUri: https://mockoauth:8090/grants", 1, TestUtil.countLogForRegex(lines, "tokenEndpointUri: https://mockoauth:8090/grants")); + Assert.assertEquals("clientId: kafka", 1, TestUtil.countLogForRegex(lines, "clientId: kafka")); + Assert.assertEquals("sslSocketFactory: null", 1, TestUtil.countLogForRegex(lines, "sslSocketFactory: null")); + Assert.assertEquals("hostnameVerifier: null", 1, TestUtil.countLogForRegex(lines, "hostnameVerifier: null")); + Assert.assertEquals("clusterName: kafka-cluster", 1, TestUtil.countLogForRegex(lines, "clusterName: kafka-cluster")); + Assert.assertEquals("delegateToKafkaACL: false", 1, TestUtil.countLogForRegex(lines, "delegateToKafkaACL: false")); + Assert.assertEquals("superUsers: []", 1, TestUtil.countLogForRegex(lines, "superUsers: \\[\\]")); + Assert.assertEquals("grantsRefreshPeriodSeconds: 60", 1, TestUtil.countLogForRegex(lines, "grantsRefreshPeriodSeconds: 60")); + Assert.assertEquals("grantsRefreshPoolSize: 5", 1, TestUtil.countLogForRegex(lines, "grantsRefreshPoolSize: 5")); + Assert.assertEquals("grantsMaxIdleTimeSeconds: 300", 1, TestUtil.countLogForRegex(lines, "grantsMaxIdleTimeSeconds: 300")); + Assert.assertEquals("httpRetries: 0", 1, TestUtil.countLogForRegex(lines, "httpRetries: 0")); + Assert.assertEquals("reuseGrants: true", 1, TestUtil.countLogForRegex(lines, "reuseGrants: true")); + Assert.assertEquals("connectTimeoutSeconds: 60", 1, TestUtil.countLogForRegex(lines, "connectTimeoutSeconds: 60")); + Assert.assertEquals("readTimeoutSeconds: 60", 1, TestUtil.countLogForRegex(lines, "readTimeoutSeconds: 60")); + Assert.assertEquals("enableMetrics: false", 1, TestUtil.countLogForRegex(lines, "enableMetrics: false")); + Assert.assertEquals("gcPeriodSeconds: 300", 1, TestUtil.countLogForRegex(lines, "gcPeriodSeconds: 300")); // Custom config @@ -487,17 +485,17 @@ void doConfigTests() throws IOException { lines = logReader.readNext(); - Assert.assertEquals("clusterName: cluster1", 1, countLogForRegex(lines, "clusterName: cluster1")); - Assert.assertEquals("superUsers: ['User:admin', 'User:service-account-kafka']", 1, countLogForRegex(lines, "superUsers: \\['User:admin', 'User:service-account-kafka'\\]")); - Assert.assertEquals("grantsRefreshPeriodSeconds: 180", 1, countLogForRegex(lines, "grantsRefreshPeriodSeconds: 180")); - Assert.assertEquals("grantsRefreshPoolSize: 3", 1, countLogForRegex(lines, "grantsRefreshPoolSize: 3")); - Assert.assertEquals("grantsMaxIdleTimeSeconds: 30", 1, countLogForRegex(lines, "grantsMaxIdleTimeSeconds: 30")); - Assert.assertEquals("httpRetries: 2", 1, countLogForRegex(lines, "httpRetries: 2")); - Assert.assertEquals("reuseGrants: false", 1, countLogForRegex(lines, "reuseGrants: false")); - Assert.assertEquals("connectTimeoutSeconds: 15", 1, countLogForRegex(lines, "connectTimeoutSeconds: 15")); - Assert.assertEquals("readTimeoutSeconds: 15", 1, countLogForRegex(lines, "readTimeoutSeconds: 15")); - Assert.assertEquals("enableMetrics: true", 1, countLogForRegex(lines, "enableMetrics: true")); - Assert.assertEquals("gcPeriodSeconds: 60", 1, countLogForRegex(lines, "gcPeriodSeconds: 60")); + Assert.assertEquals("clusterName: cluster1", 1, TestUtil.countLogForRegex(lines, "clusterName: cluster1")); + Assert.assertEquals("superUsers: ['User:admin', 'User:service-account-kafka']", 1, TestUtil.countLogForRegex(lines, "superUsers: \\['User:admin', 'User:service-account-kafka'\\]")); + Assert.assertEquals("grantsRefreshPeriodSeconds: 180", 1, TestUtil.countLogForRegex(lines, "grantsRefreshPeriodSeconds: 180")); + Assert.assertEquals("grantsRefreshPoolSize: 3", 1, TestUtil.countLogForRegex(lines, "grantsRefreshPoolSize: 3")); + Assert.assertEquals("grantsMaxIdleTimeSeconds: 30", 1, TestUtil.countLogForRegex(lines, "grantsMaxIdleTimeSeconds: 30")); + Assert.assertEquals("httpRetries: 2", 1, TestUtil.countLogForRegex(lines, "httpRetries: 2")); + Assert.assertEquals("reuseGrants: false", 1, TestUtil.countLogForRegex(lines, "reuseGrants: false")); + Assert.assertEquals("connectTimeoutSeconds: 15", 1, TestUtil.countLogForRegex(lines, "connectTimeoutSeconds: 15")); + Assert.assertEquals("readTimeoutSeconds: 15", 1, TestUtil.countLogForRegex(lines, "readTimeoutSeconds: 15")); + Assert.assertEquals("enableMetrics: true", 1, TestUtil.countLogForRegex(lines, "enableMetrics: true")); + Assert.assertEquals("gcPeriodSeconds: 60", 1, TestUtil.countLogForRegex(lines, "gcPeriodSeconds: 60")); // test gcPeriodSeconds set to 0 @@ -510,8 +508,8 @@ void doConfigTests() throws IOException { lines = logReader.readNext(); - Assert.assertEquals("gcPeriodSeconds invalid value: 0", 1, countLogForRegex(lines, "'strimzi.authorization.grants.gc.period.seconds' set to invalid value: 0, using the default value: 300 seconds")); - Assert.assertEquals("gcPeriodSeconds: 300", 1, countLogForRegex(lines, "gcPeriodSeconds: 300")); + Assert.assertEquals("gcPeriodSeconds invalid value: 0", 1, TestUtil.countLogForRegex(lines, "'strimzi.authorization.grants.gc.period.seconds' set to invalid value: 0, using the default value: 300 seconds")); + Assert.assertEquals("gcPeriodSeconds: 300", 1, TestUtil.countLogForRegex(lines, "gcPeriodSeconds: 300")); TestAuthzUtil.clearKeycloakAuthorizerService(); } @@ -554,11 +552,11 @@ void doGrantsGCTests() throws Exception { // check the logs for updated access token - LogLineReader logReader = new LogLineReader(LOG_PATH); + LogLineReader logReader = new LogLineReader(Common.LOG_PATH); // wait for cgGrants run on 0 users LOG.info("Waiting for: active users count: 0"); // Make sure to not repeat the below condition in the string here - waitFor(logReader, "Grants gc: active users count: 0"); + logReader.waitFor("Grants gc: active users count: 0"); LOG.info("Authenticate (validate) as gcUser1"); OAuthKafkaPrincipal principal = authenticate(authHandler, tokenInfo); @@ -581,7 +579,7 @@ void doGrantsGCTests() throws Exception { // check the logs for updated access token List lines = logReader.readNext(); - Assert.assertEquals("Fetch grants", 1, countLogForRegex(lines, "Fetching grants from Keycloak for user gcUser1")); + Assert.assertEquals("Fetch grants", 1, TestUtil.countLogForRegex(lines, "Fetching grants from Keycloak for user gcUser1")); String userTwo = "gcUser2"; @@ -598,7 +596,7 @@ void doGrantsGCTests() throws Exception { LOG.info("Waiting for: active users count: 2, grantsCache size before: 1, grantsCache size after: 1"); // Make sure to not repeat the below condition in the string here // wait for cgGrants run on 2 users - waitFor(logReader, "Grants gc: active users count: 2, grantsCache size before: 1, grantsCache size after: 1"); + logReader.waitFor("Grants gc: active users count: 2, grantsCache size before: 1, grantsCache size after: 1"); authzContext = newAuthorizableRequestContext(principal); @@ -609,12 +607,12 @@ void doGrantsGCTests() throws Exception { // wait for cgGrants run on 2 users and two grants cache entries LOG.info("Waiting for: active users count: 2, grantsCache size before: 2, grantsCache size after: 2"); // Make sure to not repeat the below condition in the string here - waitFor(logReader, "Grants gc: active users count: 2, grantsCache size before: 2, grantsCache size after: 2"); + logReader.waitFor("Grants gc: active users count: 2, grantsCache size before: 2, grantsCache size after: 2"); // now wait for token to expire for gcUser2 LOG.info("Waiting for: active users count: 1, grantsCache size before: 2, grantsCache size after: 1"); // Make sure to not repeat the below condition in the string here - waitFor(logReader, "Grants gc: active users count: 1, grantsCache size before: 2, grantsCache size after: 1"); + logReader.waitFor("Grants gc: active users count: 1, grantsCache size before: 2, grantsCache size after: 1"); // authorization should now fail since the token has expired @@ -697,7 +695,7 @@ void doMalformedGrantsTests() throws IOException, InterruptedException, TimeoutE new ResourcePattern(ResourceType.TOPIC, "my-topic", PatternType.LITERAL), 1, true, true)); - LogLineReader logReader = new LogLineReader(LOG_PATH); + LogLineReader logReader = new LogLineReader(Common.LOG_PATH); // seek to the end of log file logReader.readNext(); @@ -716,7 +714,7 @@ void doMalformedGrantsTests() throws IOException, InterruptedException, TimeoutE // This is a first authorize() call on the KeycloakAuthorizer -> the grantsCache is empty LOG.info("Waiting for: unsupported segment type: Topc"); // Make sure to not repeat the below condition in the string here - waitFor(logReader, "Failed to parse .* unsupported segment type: Topc"); + logReader.waitFor("Failed to parse .* unsupported segment type: Topc"); // malformed resource spec - no ':' in Topic;my-topic* @@ -726,7 +724,7 @@ void doMalformedGrantsTests() throws IOException, InterruptedException, TimeoutE // wait for grants refresh LOG.info("Waiting for: Done refreshing grants"); // Make sure to not repeat the below condition in the string here - waitFor(logReader, "Response body .*Topic;my-topic"); + logReader.waitFor("Response body .*Topic;my-topic"); LOG.info("Call authorize() - test grants record with malformed resource spec 'Topic;my-topic*' (no ':')"); @@ -734,7 +732,7 @@ void doMalformedGrantsTests() throws IOException, InterruptedException, TimeoutE Assert.assertEquals("Authz result: DENIED", AuthorizationResult.DENIED, result.get(0)); LOG.info("Waiting for: doesn't follow TYPE:NAME pattern"); // Make sure to not repeat the below condition in the string here - waitFor(logReader, "part doesn't follow TYPE:NAME pattern"); + logReader.waitFor("part doesn't follow TYPE:NAME pattern"); // malformed resource spec - '*' not at the end in 'Topic:*-topic' addGrantsForToken(tokenInfo.token(), "[{\"scopes\":[\"Delete\",\"Write\",\"Describe\",\"Read\",\"Alter\",\"Create\",\"DescribeConfigs\",\"AlterConfigs\"],\"rsid\":\"ca6f195f-dbdc-48b7-a953-8e441d17f7fa\",\"rsname\":\"Topic:*-topic\"}," + @@ -743,7 +741,7 @@ void doMalformedGrantsTests() throws IOException, InterruptedException, TimeoutE // wait for grants refresh LOG.info("Waiting for: Done refreshing grants"); // Make sure to not repeat the below condition in the string here - waitFor(logReader, "Response body .*Topic:\\*-topic"); + logReader.waitFor("Response body .*Topic:\\*-topic"); LOG.info("Call authorize() - test grants record with malformed resource spec 'Topic:*-topic' ('*' only interpreted as asterisk at the end of resource spec)"); result = authorizer.authorize(authzContext, actions); @@ -757,7 +755,7 @@ void doMalformedGrantsTests() throws IOException, InterruptedException, TimeoutE // wait for grants refresh LOG.info("Waiting for: Done refreshing grants"); // Make sure to not repeat the below condition in the string here - waitFor(logReader, "Response body .*Crate"); + logReader.waitFor("Response body .*Crate"); LOG.info("Call authorize() - test grants record with unknown / invalid scope 'Crate' (it should be 'Create')"); result = authorizer.authorize(authzContext, actions); @@ -793,7 +791,7 @@ void doGrantsSemanticEqualsTest() throws Exception { new ResourcePattern(ResourceType.TOPIC, "x_topic", PatternType.LITERAL), 1, true, true)); - LogLineReader logReader = new LogLineReader(LOG_PATH); + LogLineReader logReader = new LogLineReader(Common.LOG_PATH); // seek to the end of log file logReader.readNext(); @@ -812,7 +810,7 @@ void doGrantsSemanticEqualsTest() throws Exception { // Check log for 'Saving non-null grants for user: alice' LOG.info("Waiting for: Saving non-null grants"); // Make sure to not repeat the below condition in the string here - waitFor(logReader, "Saving non-null grants for user: alice"); + logReader.waitFor("Saving non-null grants for user: alice"); // set grants for the user to `grants2` which are semantically different from `grants1` addGrantsForToken(tokenInfo.token(), grants2); @@ -820,22 +818,21 @@ void doGrantsSemanticEqualsTest() throws Exception { // wait for the refresh job to fetch the new grants // Check log for 'Grants have changed for user: alice' LOG.info("Waiting for: Grants have changed"); // Make sure to not repeat the below condition in the string here - waitFor(logReader, "Grants have changed for user: alice"); + logReader.waitFor("Grants have changed for user: alice"); // set grants for the user to `grants3` which are semantically equal to `grants2` addGrantsForToken(tokenInfo.token(), grants3); // wait for the refresh job to fetch the new grants LOG.info("Waiting for: Refreshing grants to start"); // Make sure to not repeat the below condition in the string here - waitFor(logReader, "Refreshing authorization grants"); + logReader.waitFor("Refreshing authorization grants"); // Check log for 'Done refreshing grants' and there should be no preceding line containing 'Grants have changed for user' // wait for refresh grants job to complete LOG.info("Waiting for grants refresh to complete"); // Make sure to not repeat the below condition in the string here - List lines = waitFor(logReader, "Done refreshing grants"); + List lines = logReader.waitFor("Done refreshing grants"); - int matchCount = countLogForRegex(lines, "Grants have changed for user"); - Assert.assertEquals("Grants have changed again ?!?", 0, matchCount); + Assert.assertFalse("Grants have changed again ?!?", checkLogForRegex(lines, "Grants have changed for user")); result = authorizer.authorize(authzContext, actions); Assert.assertEquals("Authz result: ALLOWED", AuthorizationResult.ALLOWED, result.get(0)); @@ -849,7 +846,7 @@ void doSingletonTest() throws Exception { HashMap config = configureAuthorizer(); - LogLineReader logReader = new LogLineReader(LOG_PATH); + LogLineReader logReader = new LogLineReader(Common.LOG_PATH); logReader.readNext(); List lines; @@ -871,26 +868,6 @@ void doSingletonTest() throws Exception { TestAuthzUtil.clearKeycloakAuthorizerService(); } - private List waitFor(LogLineReader logReader, String condition) throws TimeoutException, InterruptedException { - List result = new ArrayList<>(); - TestUtil.waitForCondition(() -> { - try { - List lines = logReader.readNext(); - int lineNum = findFirstMatchingInLog(lines, condition); - if (lineNum >= 0) { - result.addAll(lines.subList(0, lineNum)); - return true; - } - result.addAll(lines); - return false; - } catch (Exception e) { - throw new RuntimeException("Failed to read log", e); - } - }, LOOP_PAUSE_MS, TIMEOUT_SECONDS); - - return result; - } - private static Future> submitAuthorizationCall(KeycloakAuthorizer authorizer, AuthorizableRequestContext ctx, ExecutorService executorService, String topic) { return executorService.submit(() -> { List actions = new ArrayList<>(); @@ -929,45 +906,6 @@ static HashMap configureAuthorizer(String clientSrv, String clie return props; } - static int countLogForRegex(List log, String regex) { - int count = 0; - Pattern pattern = Pattern.compile(prepareRegex(regex)); - for (String line: log) { - if (pattern.matcher(line).matches()) { - count += 1; - } - } - return count; - } - - static int findFirstMatchingInLog(List log, String regex) { - int lineNum = 0; - Pattern pattern = Pattern.compile(prepareRegex(regex)); - for (String line: log) { - if (pattern.matcher(line).matches()) { - return lineNum; - } - lineNum++; - } - return -1; - } - - static String prepareRegex(String regex) { - String prefix = regex.startsWith("^") ? "" : ".*"; - String suffix = regex.endsWith("$") ? "" : ".*"; - return prefix + regex + suffix; - } - - static boolean checkLogForRegex(List log, String regex) { - Pattern pattern = Pattern.compile(prepareRegex(regex)); - for (String line: log) { - if (pattern.matcher(line).matches()) { - return true; - } - } - return false; - } - private AuthorizableRequestContext newAuthorizableRequestContext(KafkaPrincipal principal) { AuthorizableRequestContext ctx = mock(AuthorizableRequestContext.class); when(ctx.listenerName()).thenReturn("JWT"); diff --git a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/LogLineReader.java b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/LogLineReader.java index 24bb26d1..270870ca 100644 --- a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/LogLineReader.java +++ b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/LogLineReader.java @@ -4,10 +4,14 @@ */ package io.strimzi.testsuite.oauth.mockoauth; +import io.strimzi.testsuite.oauth.common.TestUtil; + import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeoutException; /** * A very inefficient but simple and good-enough-for-tests implementation of incrementally reading a log file and serving content as lines @@ -17,11 +21,32 @@ public class LogLineReader { private final String logPath; private int logLineOffset = 0; - LogLineReader(String logPath) { + public LogLineReader(String logPath) { this.logPath = logPath; } - List readNext() throws IOException { + public List waitFor(String condition) throws TimeoutException, InterruptedException { + List result = new ArrayList<>(); + TestUtil.waitForCondition(() -> { + try { + List lines = readNext(); + int lineNum = TestUtil.findFirstMatchingInLog(lines, condition); + if (lineNum >= 0) { + result.addAll(lines.subList(0, lineNum)); + logLineOffset -= lines.size() - lineNum + 1; + return true; + } + result.addAll(lines); + return false; + } catch (Exception e) { + throw new RuntimeException("Failed to read log", e); + } + }, KeycloakAuthorizerTest.LOOP_PAUSE_MS, KeycloakAuthorizerTest.TIMEOUT_SECONDS); + + return result; + } + + public List readNext() throws IOException { List lines = Files.readAllLines(Paths.get(logPath)); List result = lines.subList(logLineOffset, lines.size()); logLineOffset = lines.size(); diff --git a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/metrics/MetricEntry.java b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/metrics/MetricEntry.java similarity index 88% rename from testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/metrics/MetricEntry.java rename to testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/metrics/MetricEntry.java index 1ca0daba..618644f0 100644 --- a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/metrics/MetricEntry.java +++ b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/metrics/MetricEntry.java @@ -2,7 +2,7 @@ * Copyright 2017-2022, Strimzi authors. * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). */ -package io.strimzi.testsuite.oauth.metrics; +package io.strimzi.testsuite.oauth.mockoauth.metrics; import java.util.Map; diff --git a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/metrics/Metrics.java b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/metrics/Metrics.java similarity index 97% rename from testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/metrics/Metrics.java rename to testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/metrics/Metrics.java index 315d4b98..3a4dbc5a 100644 --- a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/metrics/Metrics.java +++ b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/metrics/Metrics.java @@ -2,7 +2,7 @@ * Copyright 2017-2022, Strimzi authors. * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). */ -package io.strimzi.testsuite.oauth.metrics; +package io.strimzi.testsuite.oauth.mockoauth.metrics; import java.math.BigDecimal; import java.util.ArrayList; diff --git a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/metrics/MetricsTest.java b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/metrics/MetricsTest.java similarity index 53% rename from testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/metrics/MetricsTest.java rename to testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/metrics/MetricsTest.java index 75a12461..6c0a50e1 100644 --- a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/metrics/MetricsTest.java +++ b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/metrics/MetricsTest.java @@ -2,12 +2,29 @@ * Copyright 2017-2021, Strimzi authors. * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). */ -package io.strimzi.testsuite.oauth.metrics; - +package io.strimzi.testsuite.oauth.mockoauth.metrics; + +import io.strimzi.kafka.oauth.client.ClientConfig; +import io.strimzi.kafka.oauth.metrics.GlobalConfig; +import io.strimzi.kafka.oauth.services.Services; +import io.strimzi.testsuite.oauth.common.TestUtil; +import io.strimzi.testsuite.oauth.mockoauth.Common; +import io.strimzi.testsuite.oauth.mockoauth.LogLineReader; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.math.BigDecimal; import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeoutException; import static io.strimzi.testsuite.oauth.mockoauth.Common.changeAuthServerMode; import static io.strimzi.testsuite.oauth.mockoauth.Common.getPrometheusMetrics; @@ -16,8 +33,14 @@ public class MetricsTest { + private static final Logger LOG = LoggerFactory.getLogger(MetricsTest.class); + private final static int PAUSE_MILLIS = 15_000; + private static final String KAFKA_BOOTSTRAP = "kafka:9092"; + private static final String TOKEN_ENDPOINT_URI = "https://mockoauth:8090/token"; + + public void doTest() throws Exception { logStart("Check there are no Prometheus metrics errors before the tests are run"); @@ -34,6 +57,65 @@ public void doTest() throws Exception { logStart("Check 5xx errors show in Prometheus metrics"); testInternalServerErrors(); + + logStart("Check `strimzi.oauth.metric.reporters` configuration handling"); + testOAuthMetricReporters(); + + logStart("Check enabling metrics with JMX for the Kafka client"); + testKafkaClientConfig(); + } + + private void testKafkaClientConfig() throws Exception { + + Map oauthConfig = new HashMap<>(); + oauthConfig.put(ClientConfig.OAUTH_TOKEN_ENDPOINT_URI, TOKEN_ENDPOINT_URI); + oauthConfig.put(ClientConfig.OAUTH_CLIENT_ID, "ignored"); + oauthConfig.put(ClientConfig.OAUTH_CLIENT_SECRET, "ignored"); + oauthConfig.put(ClientConfig.OAUTH_ENABLE_METRICS, "true"); + + Properties producerProps = new Properties(); + + LogLineReader logReader = new LogLineReader(Common.LOG_PATH); + logReader.readNext(); + + // Clear the configured metrics in order to trigger reinitialisation + Services.close(); + + try { + initJaas(oauthConfig, producerProps); + Assert.fail("Should have failed due to bad access token"); + + } catch (Exception e) { + LOG.debug("[IGNORED] Failed as expected: {}", e.getMessage(), e); + } + + List lines = logReader.readNext(); + Assert.assertTrue("Instantiated JMX Reporter", TestUtil.checkLogForRegex(lines, "reporters: \\[org\\.apache\\.kafka\\.common\\.metrics\\.JmxReporter")); + + + producerProps.put(GlobalConfig.STRIMZI_OAUTH_METRIC_REPORTERS, "io.strimzi.testsuite.oauth.common.metrics.TestMetricsReporter"); + + // Clear the configured metrics in order to trigger reinitialisation + Services.close(); + + try { + initJaas(oauthConfig, producerProps); + Assert.fail("Should have failed due to bad access token"); + + } catch (Exception e) { + LOG.debug("[IGNORED] Failed as expected: {}", e.getMessage(), e); + } + + lines = logReader.readNext(); + Assert.assertTrue("Instantiated TestMetricsReporter", TestUtil.checkLogForRegex(lines, "reporters: \\[io\\.strimzi\\.testsuite\\.oauth\\.common\\.metrics\\.TestMetricsReporter[^,]+\\]")); + } + + private void initJaas(Map oauthConfig, Properties additionalProps) throws Exception { + Properties producerProps = Common.buildProducerConfigOAuthBearer(KAFKA_BOOTSTRAP, oauthConfig); + producerProps.putAll(additionalProps); + try (Producer producer = new KafkaProducer<>(producerProps)) { + producer.send(new ProducerRecord<>("Test-testTopic", "The Message")).get(); + } } private void testInternalServerErrors() throws IOException, InterruptedException { @@ -155,6 +237,54 @@ private void zeroCheck() throws IOException { Assert.assertNull(value); } + private void testOAuthMetricReporters() throws IOException, InterruptedException, TimeoutException { + + LogLineReader logReader = new LogLineReader(Common.LOG_PATH); + // seek to the end of log file + logReader.readNext(); + + Map configs = new HashMap<>(); + configs.put("strimzi.oauth.metric.reporters", "io.strimzi.testsuite.oauth.common.metrics.TestMetricsReporter"); + reinitServices(configs); + Services.getInstance().getMetrics(); + LOG.info("Waiting for: reporters: TestMetricsReporter"); // Make sure to not repeat the below condition in the string here + logReader.waitFor("reporters: \\[io\\.strimzi\\.testsuite\\.oauth\\.common\\.metrics\\.TestMetricsReporter[^,]+\\]"); + + configs.remove("strimzi.oauth.metric.reporters"); + configs.put("metric.reporters", "io.strimzi.testsuite.oauth.common.metrics.TestMetricsReporter"); + reinitServices(configs); + // JmxReporter will be instantiated, 'metric.reporters' setting is ignored + Services.getInstance().getMetrics(); + LOG.info("Waiting for: reporters: JmxReporter"); // Make sure to not repeat the below condition in the string here + logReader.waitFor("reporters: \\[org\\.apache\\.kafka\\.common\\.metrics\\.JmxReporter"); + + configs.put("strimzi.oauth.metric.reporters", "org.apache.kafka.common.metrics.JmxReporter"); + reinitServices(configs); + // Only JmxReporter will be instantiated the other setting is ignored + Services.getInstance().getMetrics(); + LOG.info("Waiting for: reporters: JmxReporter"); // Make sure to not repeat the below condition in the string here + logReader.waitFor("reporters: \\[org\\.apache\\.kafka\\.common\\.metrics\\.JmxReporter"); + + configs.put("strimzi.oauth.metric.reporters", ""); + reinitServices(configs); + // No reporter will be instantiated + Services.getInstance().getMetrics(); + LOG.info("Waiting for: reporters: "); // Make sure to not repeat the below condition in the string here + logReader.waitFor("reporters: \\[\\]"); + + configs.put("strimzi.oauth.metric.reporters", "org.apache.kafka.common.metrics.JmxReporter,io.strimzi.testsuite.oauth.common.metrics.TestMetricsReporter"); + reinitServices(configs); + // JmxReporter and TestMetricsReporter are instantiated + Services.getInstance().getMetrics(); + LOG.info("Waiting for: reporters: JmxReporter,TestMetricsReporter"); // Make sure to not repeat the below condition in the string here + logReader.waitFor("reporters: \\[org\\.apache\\.kafka\\.common\\.metrics\\.JmxReporter.*, io\\.strimzi\\.testsuite\\.oauth\\.common\\.metrics\\.TestMetricsReporter"); + } + + private void reinitServices(Map configs) { + Services.close(); + Services.configure(configs); + } + private void logStart(String msg) { System.out.println(" ==== " + msg); }