Skip to content

Commit

Permalink
Minimise initial grants requests for 'same token, different session' …
Browse files Browse the repository at this point in the history
…case (strimzi#180)

* Minimise initial grants requests for 'same token, different session' case

If grants for the token exist in another session, reuse them, rather than fetch them again.
Also introduce per-access-token semaphore to prevent parallel fetching of grants for the same token.

Add `strimzi.authorization.reuse.grants` config option to enable this optimization.

Co-authored-by: Jakub Scholz <[email protected]>
Signed-off-by: Marko Strukelj <[email protected]>
  • Loading branch information
mstruk and scholzj authored Jan 13, 2023
1 parent 3fa85fc commit 848df0b
Show file tree
Hide file tree
Showing 11 changed files with 225 additions and 17 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,17 @@ since they are not yet available. The following option enables reattempting the
receive the `AuthorizationException`. The default value is '0', meaning 'no retries'. Provide the value greater than '0' to set the number of repeated attempts:
- `strimzi.authorization.http.retries` (e.g.: "1" - if initial fetching of grants for the session fails, immediately retry one more time)

A single client typically uses a single unique access token for the concurrent sessions to the Kafka broker.
As a result, the number of active tokens on the broker is generally less than the number of active sessions (connections).
However, keep in mind that this is replicated across all Kafka brokers in the cluster, as clients maintain active sessions to multiple brokers.

New sessions will, by default, request the latest grants from the Keycloak in order for any changes in permissions to be reflected immediately.
You can change this, and reuse the grants for the token, if they have previously been fetched due to the same token already having been used
for another session on the broker. This can noticeably reduce the load from brokers to the Keycloak and can also help alleviate 'glitchiness' issues
addressed by `strimzi.authorization.http.retries`. However, as a result, the grants initially used for the new session may be out-of-sync with
Keycloak for up to `strimzi.authorization.grants.refresh.period.seconds`.
- `strimzi.authorization.reuse.grants` (e.g.: "true" - if enabled, then grants fetched for another session may be used)

You may also want to configure some other things. You may want to set a logical cluster name so you can target it with authorization rules:
- `strimzi.authorization.kafka.cluster.name` (e.g.: "dev-cluster" - a logical name of the cluster which can be targeted with authorization services resource definitions, and permission policies)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,23 @@ public void cleanupExpired() {
}
}
}

/**
* Iterate over sessions, and find the first element matching the filter.
*
* @param filter A filter to apply
* @return The first matching session
*/
public BearerTokenWithPayload findFirst(Predicate<BearerTokenWithPayload> filter) {
// In order to prevent the possible ConcurrentModificationException in the middle of using an iterator
// we first make a local copy, then iterate over the copy
ArrayList<BearerTokenWithPayload> values = new ArrayList<>(activeSessions.keySet());

for (BearerTokenWithPayload token: values) {
if (filter.test(token)) {
return token;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class AuthzConfig extends Config {
public static final String STRIMZI_AUTHORIZATION_CONNECT_TIMEOUT_SECONDS = "strimzi.authorization.connect.timeout.seconds";
public static final String STRIMZI_AUTHORIZATION_READ_TIMEOUT_SECONDS = "strimzi.authorization.read.timeout.seconds";
public static final String STRIMZI_AUTHORIZATION_ENABLE_METRICS = "strimzi.authorization.enable.metrics";
public static final String STRIMZI_AUTHORIZATION_REUSE_GRANTS = "strimzi.authorization.reuse.grants";

AuthzConfig() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ public class KeycloakRBACAuthorizer extends AclAuthorizer {

private int httpRetries;

private boolean reuseGrants;

// Turning it to false will not enforce access token expiry time (only for debugging purposes during development)
private final boolean denyWhenTokenInvalid = true;

Expand All @@ -191,7 +193,7 @@ public class KeycloakRBACAuthorizer extends AclAuthorizer {
private boolean enableMetrics;
private SensorKeyProducer authzSensorKeyProducer;
private SensorKeyProducer grantsSensorKeyProducer;

private final Semaphores<JsonNode> semaphores = new Semaphores<>();
public KeycloakRBACAuthorizer() {
super();
}
Expand Down Expand Up @@ -245,6 +247,8 @@ public void configure(Map<String, ?> configs) {
setupRefreshGrantsJob(grantsRefreshPeriodSeconds);
}

reuseGrants = config.getValueAsBoolean(AuthzConfig.STRIMZI_AUTHORIZATION_REUSE_GRANTS, false);

configureHttpRetries(config);

configureMetrics(configs, config);
Expand All @@ -267,6 +271,7 @@ public void configure(Map<String, ?> configs) {
+ "\n grantsRefreshPeriodSeconds: " + grantsRefreshPeriodSeconds
+ "\n grantsRefreshPoolSize: " + grantsRefreshPoolSize
+ "\n httpRetries: " + httpRetries
+ "\n reuseGrants: " + reuseGrants
+ "\n connectTimeoutSeconds: " + connectTimeoutSeconds
+ "\n readTimeoutSeconds: " + readTimeoutSeconds
+ "\n enableMetrics: " + enableMetrics
Expand Down Expand Up @@ -329,7 +334,7 @@ private void configureMetrics(Map<String, ?> configs, AuthzConfig config) {
/**
* This method extracts the key=value configuration entries relevant for KeycloakRBACAuthorizer from
* Kafka properties configuration file (server.properties) and wraps them with AuthzConfig instance.
*
* <p>
* Any new config options have to be added here in order to become visible, otherwise they will be ignored.
*
* @param configs Kafka configs map
Expand All @@ -344,6 +349,7 @@ static AuthzConfig convertToCommonConfig(Map<String, ?> configs) {
AuthzConfig.STRIMZI_AUTHORIZATION_GRANTS_REFRESH_PERIOD_SECONDS,
AuthzConfig.STRIMZI_AUTHORIZATION_GRANTS_REFRESH_POOL_SIZE,
AuthzConfig.STRIMZI_AUTHORIZATION_HTTP_RETRIES,
AuthzConfig.STRIMZI_AUTHORIZATION_REUSE_GRANTS,
AuthzConfig.STRIMZI_AUTHORIZATION_DELEGATE_TO_KAFKA_ACL,
AuthzConfig.STRIMZI_AUTHORIZATION_KAFKA_CLUSTER_NAME,
AuthzConfig.STRIMZI_AUTHORIZATION_CLIENT_ID,
Expand Down Expand Up @@ -406,10 +412,10 @@ static HostnameVerifier createHostnameVerifier(Config config) {

/**
* The method that makes the authorization decision.
*
* <p>
* We assume authorize() is thread-safe in a sense that there will not be two concurrent threads
* calling it at the same time for the same session.
*
* <p>
* Should that not be the case, the side effect could be to make more calls to token endpoint than necessary.
* Other than that it should not affect proper functioning of this authorizer.
*
Expand Down Expand Up @@ -549,11 +555,60 @@ private boolean denyIfTokenInvalid(BearerTokenWithPayload token) {
return false;
}



private JsonNode handleFetchingGrants(BearerTokenWithPayload token) {
// Fetch authorization grants
JsonNode grants = null;
Semaphores.SemaphoreResult<JsonNode> semaphore = semaphores.acquireSemaphore(token.value());

// Try to acquire semaphore for fetching grants
if (semaphore.acquired()) {
// If acquired
try {
JsonNode grants = null;
if (reuseGrants) {
// If reuseGrants is enabled, first try to get the grants from one of the existing sessions having the same access token
grants = lookupGrantsInExistingSessions(token);
}
if (grants == null) {
// If grants not available it is on us to fetch (others may be waiting)
grants = fetchAndStoreGrants(token);
} else {
log.debug("Found existing grants for the token on another session");
}

semaphore.future().complete(grants);
return grants;

} catch (Throwable t) {
semaphore.future().completeExceptionally(t);
throw t;
} finally {
semaphores.releaseSemaphore(token.value());
}

} else {
try {
log.debug("Waiting on another thread to get grants");
return semaphore.future().get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof ServiceException) {
throw (ServiceException) cause;
} else {
throw new ServiceException("ExecutionException waiting for grants result: ", e);
}
} catch (InterruptedException e) {
throw new ServiceException("InterruptedException waiting for grants result: ", e);
}
}
}

private JsonNode fetchAndStoreGrants(BearerTokenWithPayload token) {
// If no grants found, fetch grants from server
JsonNode grants = null;
try {
log.debug("Fetching grants from Keycloak");
grants = fetchAuthorizationGrants(token.value());
if (grants == null) {
log.debug("Received null grants for token: {}", mask(token.value()));
Expand All @@ -567,13 +622,21 @@ private JsonNode handleFetchingGrants(BearerTokenWithPayload token) {
}
}
if (grants != null) {
// Store authz grants in the token so they are available for subsequent requests
// Store authz grants in the token, so they are available for subsequent requests
log.debug("Saving non-null grants for token: {}", mask(token.value()));
token.setPayload(grants);
}
return grants;
}

private static JsonNode lookupGrantsInExistingSessions(BearerTokenWithPayload token) {
Sessions sessions = Services.getInstance().getSessions();
BearerTokenWithPayload existing = sessions.findFirst(t ->
t.value().equals(token.value()) && t.getPayload() != null
);
return existing != null ? (JsonNode) existing.getPayload() : null;
}

static List<ScopesSpec.AuthzScope> validateScopes(List<String> scopes) {
List<ScopesSpec.AuthzScope> enumScopes = new ArrayList<>(scopes.size());
for (String name: scopes) {
Expand Down Expand Up @@ -649,11 +712,11 @@ private void logDenied(Logger logger, AuthorizableRequestContext context, JsonNo
/**
* Method that performs the POST request to fetch grants for the token.
* In case of a connection failure or a non-200 status response this method immediately retries the request if so configured.
*
* <p>
* Status 401 does not trigger a retry since it is used to signal an invalid token.
* Status 403 does not trigger a retry either since it signals no permissions.
*
* @param token
* @param token The raw access token
* @return Grants JSON response
*/
private JsonNode fetchAuthorizationGrants(String token) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2017-2019, Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.oauth.server.authorizer;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

class Semaphores<T> {

private final ConcurrentHashMap<String, Semaphore<T>> futures = new ConcurrentHashMap<>();

SemaphoreResult<T> acquireSemaphore(String key) {
Semaphore<T> semaphore = futures.computeIfAbsent(key, v -> new Semaphore<>());
return new SemaphoreResult<>(semaphore);
}

void releaseSemaphore(String key) {
futures.remove(key);
}

static class Semaphore<T> {

private final CompletableFuture<T> future = new CompletableFuture<>();
private final AtomicBoolean acquired = new AtomicBoolean(true);

private Semaphore() {}

private boolean tryAcquire() {
return acquired.getAndSet(false);
}
}

static class SemaphoreResult<T> {

private final boolean acquired;
private final CompletableFuture<T> future;

private SemaphoreResult(Semaphore<T> semaphore) {
this.acquired = semaphore.tryAcquire();
this.future = semaphore.future;
}

boolean acquired() {
return acquired;
}

CompletableFuture<T> future() {
return future;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ public int getReadTimeout() {
static class BearerTokenWithPayloadImpl implements BearerTokenWithPayload {

private final TokenInfo ti;
private Object payload;
private volatile Object payload;

BearerTokenWithPayloadImpl(TokenInfo ti) {
if (ti == null) {
Expand All @@ -666,12 +666,12 @@ static class BearerTokenWithPayloadImpl implements BearerTokenWithPayload {
}

@Override
public Object getPayload() {
public synchronized Object getPayload() {
return payload;
}

@Override
public void setPayload(Object value) {
public synchronized void setPayload(Object value) {
payload = value;
}

Expand Down
2 changes: 2 additions & 0 deletions testsuite/keycloak-authz-tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ services:
- KAFKA_STRIMZI_AUTHORIZATION_GRANTS_REFRESH_PERIOD_SECONDS=10
# If a grants fetch fails, immediately perform one retry
- KAFKA_STRIMZI_AUTHORIZATION_HTTP_RETRIES=1
# Use grants fetched for another session if available
- KAFKA_STRIMZI_AUTHORIZATION_REUSE_GRANTS=true

- KAFKA_STRIMZI_AUTHORIZATION_ENABLE_METRICS=true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public void doTest() {

value = getLoggerAttribute(lines, "httpRetries");
Assert.assertEquals("'httpRetries' should be 1", "1", value);

value = getLoggerAttribute(lines, "reuseGrants");
Assert.assertEquals("'reuseGrants' should be true", "true", value);
}

private static String getLoggerAttribute(List<String> lines, String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import static io.strimzi.kafka.oauth.common.OAuthAuthenticator.loginWithClientSecret;
import static io.strimzi.testsuite.oauth.common.TestUtil.getContainerLogsForString;

public class FloodTest extends Common {

Expand All @@ -36,9 +38,11 @@ public class FloodTest extends Common {

static int sendLimit = 1;

private final String kafkaContainer;

FloodTest(String kafkaBootstrap, boolean oauthOverPlain) {
FloodTest(String kafkaContainer, String kafkaBootstrap, boolean oauthOverPlain) {
super(kafkaBootstrap, oauthOverPlain);
this.kafkaContainer = kafkaContainer;
}

public void doTest() throws IOException {
Expand Down Expand Up @@ -81,6 +85,7 @@ void clientCredentialsWithFloodTest() throws IOException {
Assert.assertTrue("Exception type should be AuthorizationException", e.getCause() instanceof AuthorizationException);
}


// Do 5 iterations - each time hitting the broker with 10 parallel requests
for (int run = 0; run < 5; run++) {
System.out.println("\n*** Run " + (run + 1) + "/5\n");
Expand Down Expand Up @@ -132,6 +137,16 @@ void clientCredentialsWithFloodTest() throws IOException {
}
}

private int currentFoundExistingGrantsLogCount() {
List<String> lines = getContainerLogsForString(kafkaContainer, "Found existing grants for the token on another session");
return lines.size();
}

private int currentSemaphoreBlockLogCount() {
List<String> lines = getContainerLogsForString(kafkaContainer, "Waiting on another thread to get grants");
return lines.size();
}

private void sendSingleMessage(String clientId, String secret, String topic) throws ExecutionException, InterruptedException {
Properties props = buildProducerConfig(kafkaBootstrap, usePlain, clientId, secret);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ public void doTest() throws Exception {
logStart("KeycloakAuthorizationTest :: MetricsTest");
MetricsTest.doTest();

// This test assumes that it is the first producing and consuming test
logStart("KeycloakAuthorizationTest :: MultiSaslTests");
MultiSaslTest.doTest();
new MultiSaslTest(kafkaContainer).doTest();

logStart("KeycloakAuthorizationTest :: JwtValidationAuthzTest");
new BasicTest(JWT_LISTENER, false).doTest();
Expand All @@ -79,13 +80,13 @@ public void doTest() throws Exception {
new OAuthOverPlainTest(INTROSPECTPLAIN_LISTENER, true).doTest();

logStart("KeycloakAuthorizationTest :: OAuthOverPLain + FloodTest");
new FloodTest(JWTPLAIN_LISTENER, true).doTest();
new FloodTest(kafkaContainer, JWTPLAIN_LISTENER, true).doTest();

logStart("KeycloakAuthorizationTest :: JWT FloodTest");
new FloodTest(JWT_LISTENER, false).doTest();
new FloodTest(kafkaContainer, JWT_LISTENER, false).doTest();

logStart("KeycloakAuthorizationTest :: Introspection FloodTest");
new FloodTest(INTROSPECT_LISTENER, false).doTest();
new FloodTest(kafkaContainer, INTROSPECT_LISTENER, false).doTest();

// This test has to be the last one - it changes the team-a-client, and team-b-client permissions in Keycloak
logStart("KeycloakAuthorizationTest :: JwtValidationAuthzTest + RefreshGrants");
Expand Down
Loading

0 comments on commit 848df0b

Please sign in to comment.