diff --git a/docs/content/en/docs/documentation/reconciler.md b/docs/content/en/docs/documentation/reconciler.md index b9ede8aa95..fa51399de7 100644 --- a/docs/content/en/docs/documentation/reconciler.md +++ b/docs/content/en/docs/documentation/reconciler.md @@ -175,23 +175,23 @@ From v5, by default, the finalizer is added using Server Side Apply. See also `U It is typical to want to update the status subresource with the information that is available during the reconciliation. This is sometimes referred to as the last observed state. When the primary resource is updated, though, the framework does not cache the resource directly, relying instead on the propagation of the update to the underlying informer's -cache. It can, therefore, happen that, if other events trigger other reconciliations before the informer cache gets +cache. It can, therefore, happen that, if other events trigger other reconciliations, before the informer cache gets updated, your reconciler does not see the latest version of the primary resource. While this might not typically be a problem in most cases, as caches eventually become consistent, depending on your reconciliation logic, you might still -require the latest status version possible, for example if the status subresource is used as a communication mechanism, -see [Representing Allocated Values](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#representing-allocated-values) +require the latest status version possible, for example, if the status subresource is used to store allocated values. +See [Representing Allocated Values](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#representing-allocated-values) from the Kubernetes docs for more details. -The framework provides utilities to help with these use cases with -[`PrimaryUpdateAndCacheUtils`](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java). -These utility methods come in two flavors: +The framework provides the +[`PrimaryUpdateAndCacheUtils`](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java) utility class +to help with these use cases. -#### Using internal cache - -In almost all cases for this purpose, you can use internal caches: +This class' methods use internal caches in combination with update methods that leveraging +optimistic locking. If the update method fails on optimistic locking, it will retry +using a fresh resource from the server as base for modification. ```java - @Override +@Override public UpdateControl reconcile( StatusPatchCacheCustomResource resource, Context context) { @@ -201,85 +201,17 @@ public UpdateControl reconcile( var freshCopy = createFreshCopy(primary); freshCopy.getStatus().setValue(statusWithState()); - var updatedResource = PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(resource, freshCopy, context); - - return UpdateControl.noUpdate(); - } -``` - -In the background `PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus` puts the result of the update into an internal -cache and will make sure that the next reconciliation will contain the most recent version of the resource. Note that it -is not necessarily the version of the resource you got as response from the update, it can be newer since other parties -can do additional updates meanwhile, but if not explicitly modified, it will contain the up-to-date status. - -See related integration test [here](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal). - -This approach works with the default configuration of the framework and should be good to go in most of the cases. -Without going further into the details, this won't work if `ConfigurationService.parseResourceVersionsForEventFilteringAndCaching` -is set to `false` (more precisely there are some edge cases when it won't work). For that case framework provides the following solution: - -#### Fallback approach: using `PrimaryResourceCache` cache - -As an alternative, for very rare cases when `ConfigurationService.parseResourceVersionsForEventFilteringAndCaching` -needs to be set to `false` you can use an explicit caching approach: - -```java - -// We on purpose don't use the provided predicate to show what a custom one could look like. - private final PrimaryResourceCache cache = - new PrimaryResourceCache<>( - (statusPatchCacheCustomResourcePair, statusPatchCacheCustomResource) -> - statusPatchCacheCustomResource.getStatus().getValue() - >= statusPatchCacheCustomResourcePair.afterUpdate().getStatus().getValue()); - - @Override - public UpdateControl reconcile( - StatusPatchPrimaryCacheCustomResource primary, - Context context) { - - // cache will compare the current and the cached resource and return the more recent. (And evict the old) - primary = cache.getFreshResource(primary); - - // omitted logic - - var freshCopy = createFreshCopy(primary); + var updatedResource = PrimaryUpdateAndCacheUtils.ssaPatchStatusAndCacheResource(resource, freshCopy, context); - freshCopy.getStatus().setValue(statusWithState()); - - var updated = - PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(primary, freshCopy, context, cache); - return UpdateControl.noUpdate(); } - - @Override - public DeleteControl cleanup( - StatusPatchPrimaryCacheCustomResource resource, - Context context) - throws Exception { - // cleanup the cache on resource deletion - cache.cleanup(resource); - return DeleteControl.defaultDelete(); - } - ``` -[`PrimaryResourceCache`](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCache.java) -is designed for this purpose. As shown in the example above, it is up to you to provide a predicate to determine if the -resource is more recent than the one available. In other words, when to evict the resource from the cache. Typically, as -shown in -the [integration test](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache) -you can have a counter in status to check on that. - -Since all of this happens explicitly, you cannot use this approach for managed dependent resources and workflows and -will need to use the unmanaged approach instead. This is due to the fact that managed dependent resources always get -their associated primary resource from the underlying informer event source cache. - -#### Additional remarks +After the update `PrimaryUpdateAndCacheUtils.ssaPatchStatusAndCacheResource` puts the result of the update into an internal +cache and the framework will make sure that the next reconciliation contains the most recent version of the resource. +Note that it is not necessarily the same version returned as response from the update, it can be a newer version since other parties +can do additional updates meanwhile. However, unless it has been explicitly modified, that +resource will contain the up-to-date status. -As shown in the integration tests, there is no optimistic locking used when updating the -[resource](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java#L41) -(in other words `metadata.resourceVersion` is set to `null`). This is desired since you don't want the patch to fail on -update. -In addition, you can configure the [Fabric8 client retry](https://github.com/fabric8io/kubernetes-client?tab=readme-ov-file#configuring-the-client). +See related integration test [here](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache). diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java index 174f7667f6..ac0fe9675c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java @@ -1,146 +1,85 @@ package io.javaoperatorsdk.operator.api.reconciler; -import java.util.function.Supplier; import java.util.function.UnaryOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.dsl.base.PatchContext; import io.fabric8.kubernetes.client.dsl.base.PatchType; -import io.javaoperatorsdk.operator.api.reconciler.support.PrimaryResourceCache; +import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.processing.event.ResourceID; /** * Utility methods to patch the primary resource state and store it to the related cache, to make - * sure that fresh resource is present for the next reconciliation. The main use case for such - * updates is to store state is resource status. Use of optimistic locking is not desired for such - * updates, since we don't want to patch fail and lose information that we want to store. + * sure that the latest version of the resource is present for the next reconciliation. The main use + * case for such updates is to store state is resource status. + * + *

The way the framework handles this is with retryable updates with optimistic locking, and + * caches the updated resource from the response in an overlay cache on top of the Informer cache. + * If the update fails, it reads the primary resource from the cluster, applies the modifications + * again and retries the update. */ public class PrimaryUpdateAndCacheUtils { + public static final int DEFAULT_MAX_RETRY = 10; + private PrimaryUpdateAndCacheUtils() {} private static final Logger log = LoggerFactory.getLogger(PrimaryUpdateAndCacheUtils.class); /** - * Makes sure that the up-to-date primary resource will be present during the next reconciliation. - * Using update (PUT) method. - * - * @param primary resource - * @param context of reconciliation - * @return updated resource - * @param

primary resource type - */ - public static

P updateAndCacheStatus(P primary, Context

context) { - logWarnIfResourceVersionPresent(primary); - return patchAndCacheStatus( - primary, context, () -> context.getClient().resource(primary).updateStatus()); - } - - /** - * Makes sure that the up-to-date primary resource will be present during the next reconciliation. - * Using JSON Merge patch. - * - * @param primary resource - * @param context of reconciliation - * @return updated resource - * @param

primary resource type + * Updates the status with optimistic locking and caches the result for next reconciliation. For + * details see {@link #updateAndCacheResource}. */ - public static

P patchAndCacheStatus(P primary, Context

context) { - logWarnIfResourceVersionPresent(primary); - return patchAndCacheStatus( - primary, context, () -> context.getClient().resource(primary).patchStatus()); + public static

P updateStatusAndCacheResource( + P primary, Context

context, UnaryOperator

modificationFunction) { + return updateAndCacheResource( + primary, + context, + modificationFunction, + r -> context.getClient().resource(r).updateStatus()); } /** - * Makes sure that the up-to-date primary resource will be present during the next reconciliation. - * Using JSON Patch. - * - * @param primary resource - * @param context of reconciliation - * @return updated resource - * @param

primary resource type + * Patches the status using JSON Merge Patch with optimistic locking and caches the result for + * next reconciliation. For details see {@link #updateAndCacheResource}. */ - public static

P editAndCacheStatus( - P primary, Context

context, UnaryOperator

operation) { - logWarnIfResourceVersionPresent(primary); - return patchAndCacheStatus( - primary, context, () -> context.getClient().resource(primary).editStatus(operation)); + public static

P mergePatchStatusAndCacheResource( + P primary, Context

context, UnaryOperator

modificationFunction) { + return updateAndCacheResource( + primary, context, modificationFunction, r -> context.getClient().resource(r).patchStatus()); } /** - * Makes sure that the up-to-date primary resource will be present during the next reconciliation. - * - * @param primary resource - * @param context of reconciliation - * @param patch free implementation of cache - * @return the updated resource. - * @param

primary resource type + * Patches the status using JSON Patch with optimistic locking and caches the result for next + * reconciliation. For details see {@link #updateAndCacheResource}. */ - public static

P patchAndCacheStatus( - P primary, Context

context, Supplier

patch) { - var updatedResource = patch.get(); - context - .eventSourceRetriever() - .getControllerEventSource() - .handleRecentResourceUpdate(ResourceID.fromResource(primary), updatedResource, primary); - return updatedResource; + public static

P patchStatusAndCacheResource( + P primary, Context

context, UnaryOperator

modificationFunction) { + return updateAndCacheResource( + primary, + context, + UnaryOperator.identity(), + r -> context.getClient().resource(r).editStatus(modificationFunction)); } /** - * Makes sure that the up-to-date primary resource will be present during the next reconciliation. - * Using Server Side Apply. - * - * @param primary resource - * @param freshResourceWithStatus - fresh resource with target state - * @param context of reconciliation - * @return the updated resource. - * @param

primary resource type + * Patches the status using Server Side Apply with optimistic locking and caches the result for + * next reconciliation. For details see {@link #updateAndCacheResource}. */ - public static

P ssaPatchAndCacheStatus( + public static

P ssaPatchStatusAndCacheResource( P primary, P freshResourceWithStatus, Context

context) { - logWarnIfResourceVersionPresent(freshResourceWithStatus); - var res = - context - .getClient() - .resource(freshResourceWithStatus) - .subresource("status") - .patch( - new PatchContext.Builder() - .withForce(true) - .withFieldManager(context.getControllerConfiguration().fieldManager()) - .withPatchType(PatchType.SERVER_SIDE_APPLY) - .build()); - - context - .eventSourceRetriever() - .getControllerEventSource() - .handleRecentResourceUpdate(ResourceID.fromResource(primary), res, primary); - return res; - } - - /** - * Patches the resource and adds it to the {@link PrimaryResourceCache}. - * - * @param primary resource - * @param freshResourceWithStatus - fresh resource with target state - * @param context of reconciliation - * @param cache - resource cache managed by user - * @return the updated resource. - * @param

primary resource type - */ - public static

P ssaPatchAndCacheStatus( - P primary, P freshResourceWithStatus, Context

context, PrimaryResourceCache

cache) { - logWarnIfResourceVersionPresent(freshResourceWithStatus); - return patchAndCacheStatus( + return updateAndCacheResource( primary, - cache, - () -> + context, + r -> freshResourceWithStatus, + r -> context .getClient() - .resource(freshResourceWithStatus) + .resource(r) .subresource("status") .patch( new PatchContext.Builder() @@ -151,75 +90,104 @@ public static

P ssaPatchAndCacheStatus( } /** - * Patches the resource with JSON Patch and adds it to the {@link PrimaryResourceCache}. - * - * @param primary resource - * @param context of reconciliation - * @param cache - resource cache managed by user - * @return the updated resource. - * @param

primary resource type - */ - public static

P editAndCacheStatus( - P primary, Context

context, PrimaryResourceCache

cache, UnaryOperator

operation) { - logWarnIfResourceVersionPresent(primary); - return patchAndCacheStatus( - primary, cache, () -> context.getClient().resource(primary).editStatus(operation)); - } - - /** - * Patches the resource with JSON Merge patch and adds it to the {@link PrimaryResourceCache} - * provided. + * Same as {@link #updateAndCacheResource(HasMetadata, Context, UnaryOperator, UnaryOperator, + * int)} using the default maximum retry number as defined by {@link #DEFAULT_MAX_RETRY}. * - * @param primary resource + * @param resourceToUpdate original resource to update * @param context of reconciliation - * @param cache - resource cache managed by user - * @return the updated resource. - * @param

primary resource type + * @param modificationFunction modifications to make on primary + * @param updateMethod the update method implementation + * @param

primary type + * @return the updated resource */ - public static

P patchAndCacheStatus( - P primary, Context

context, PrimaryResourceCache

cache) { - logWarnIfResourceVersionPresent(primary); - return patchAndCacheStatus( - primary, cache, () -> context.getClient().resource(primary).patchStatus()); + public static

P updateAndCacheResource( + P resourceToUpdate, + Context

context, + UnaryOperator

modificationFunction, + UnaryOperator

updateMethod) { + return updateAndCacheResource( + resourceToUpdate, context, modificationFunction, updateMethod, DEFAULT_MAX_RETRY); } /** - * Updates the resource and adds it to the {@link PrimaryResourceCache}. + * Modifies the primary using the specified modification function, then uses the modified resource + * for the request to update with provided update method. As the {@code resourceVersion} field of + * the modified resource is set to the value found in the specified resource to update, the update + * operation will therefore use optimistic locking on the server. If the request fails on + * optimistic update, we read the resource again from the K8S API server and retry the whole + * process. In short, we make sure we always update the resource with optimistic locking, then we + * cache the resource in an internal cache. Without further going into details, the optimistic + * locking is needed so we can reliably handle the caching. * - * @param primary resource + * @param resourceToUpdate original resource to update * @param context of reconciliation - * @param cache - resource cache managed by user - * @return the updated resource. - * @param

primary resource type - */ - public static

P updateAndCacheStatus( - P primary, Context

context, PrimaryResourceCache

cache) { - logWarnIfResourceVersionPresent(primary); - return patchAndCacheStatus( - primary, cache, () -> context.getClient().resource(primary).updateStatus()); - } - - /** - * Updates the resource using the user provided implementation anc caches the result. - * - * @param primary resource - * @param cache resource cache managed by user - * @param patch implementation of resource update* - * @return the updated resource. - * @param

primary resource type + * @param modificationFunction modifications to make on primary + * @param updateMethod the update method implementation + * @param maxRetry maximum number of retries before giving up + * @param

primary type + * @return the updated resource */ - public static

P patchAndCacheStatus( - P primary, PrimaryResourceCache

cache, Supplier

patch) { - var updatedResource = patch.get(); - cache.cacheResource(primary, updatedResource); - return updatedResource; - } - - private static

void logWarnIfResourceVersionPresent(P primary) { - if (primary.getMetadata().getResourceVersion() != null) { - log.warn( - "The metadata.resourceVersion of primary resource is NOT null, " - + "using optimistic locking is discouraged for this purpose. "); + @SuppressWarnings("unchecked") + public static

P updateAndCacheResource( + P resourceToUpdate, + Context

context, + UnaryOperator

modificationFunction, + UnaryOperator

updateMethod, + int maxRetry) { + + if (log.isDebugEnabled()) { + log.debug("Conflict retrying update for: {}", ResourceID.fromResource(resourceToUpdate)); + } + P modified = null; + int retryIndex = 0; + while (true) { + try { + modified = modificationFunction.apply(resourceToUpdate); + modified + .getMetadata() + .setResourceVersion(resourceToUpdate.getMetadata().getResourceVersion()); + var updated = updateMethod.apply(modified); + context + .eventSourceRetriever() + .getControllerEventSource() + .handleRecentResourceUpdate( + ResourceID.fromResource(resourceToUpdate), updated, resourceToUpdate); + return updated; + } catch (KubernetesClientException e) { + log.trace("Exception during patch for resource: {}", resourceToUpdate); + retryIndex++; + // only retry on conflict (409) and unprocessable content (422) which + // can happen if JSON Patch is not a valid request since there was + // a concurrent request which already removed another finalizer: + // List element removal from a list is by index in JSON Patch + // so if addressing a second finalizer but first is meanwhile removed + // it is a wrong request. + if (e.getCode() != 409 && e.getCode() != 422) { + throw e; + } + if (retryIndex > maxRetry) { + log.warn("Retry exhausted, last desired resource: {}", modified); + throw new OperatorException( + "Exceeded maximum (" + + maxRetry + + ") retry attempts to patch resource: " + + ResourceID.fromResource(resourceToUpdate), + e); + } + log.debug( + "Retrying patch for resource name: {}, namespace: {}; HTTP code: {}", + resourceToUpdate.getMetadata().getName(), + resourceToUpdate.getMetadata().getNamespace(), + e.getCode()); + resourceToUpdate = + (P) + context + .getClient() + .resources(resourceToUpdate.getClass()) + .inNamespace(resourceToUpdate.getMetadata().getNamespace()) + .withName(resourceToUpdate.getMetadata().getName()) + .get(); + } } } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCache.java deleted file mode 100644 index 4da73ab8b1..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCache.java +++ /dev/null @@ -1,65 +0,0 @@ -package io.javaoperatorsdk.operator.api.reconciler.support; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.BiPredicate; - -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.processing.event.ResourceID; - -public class PrimaryResourceCache

{ - - private final BiPredicate, P> evictionPredicate; - private final ConcurrentHashMap> cache = new ConcurrentHashMap<>(); - - public PrimaryResourceCache(BiPredicate, P> evictionPredicate) { - this.evictionPredicate = evictionPredicate; - } - - public PrimaryResourceCache() { - this(new ResourceVersionParsingEvictionPredicate<>()); - } - - public void cacheResource(P afterUpdate) { - var resourceId = ResourceID.fromResource(afterUpdate); - cache.put(resourceId, new Pair<>(null, afterUpdate)); - } - - public void cacheResource(P beforeUpdate, P afterUpdate) { - var resourceId = ResourceID.fromResource(beforeUpdate); - cache.put(resourceId, new Pair<>(beforeUpdate, afterUpdate)); - } - - public P getFreshResource(P newVersion) { - var resourceId = ResourceID.fromResource(newVersion); - var pair = cache.get(resourceId); - if (pair == null) { - return newVersion; - } - if (!newVersion.getMetadata().getUid().equals(pair.afterUpdate().getMetadata().getUid())) { - cache.remove(resourceId); - return newVersion; - } - if (evictionPredicate.test(pair, newVersion)) { - cache.remove(resourceId); - return newVersion; - } else { - return pair.afterUpdate(); - } - } - - public void cleanup(P resource) { - cache.remove(ResourceID.fromResource(resource)); - } - - public record Pair(T beforeUpdate, T afterUpdate) {} - - /** This works in general, but it does not strictly follow the contract with k8s API */ - public static class ResourceVersionParsingEvictionPredicate - implements BiPredicate, T> { - @Override - public boolean test(Pair updatePair, T newVersion) { - return Long.parseLong(updatePair.afterUpdate().getMetadata().getResourceVersion()) - <= Long.parseLong(newVersion.getMetadata().getResourceVersion()); - } - } -} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtilsTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtilsTest.java new file mode 100644 index 0000000000..438941db9c --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtilsTest.java @@ -0,0 +1,111 @@ +package io.javaoperatorsdk.operator.api.reconciler; + +import java.util.function.UnaryOperator; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.Resource; +import io.javaoperatorsdk.operator.OperatorException; +import io.javaoperatorsdk.operator.TestUtils; +import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever; +import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerEventSource; +import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; + +import static io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils.DEFAULT_MAX_RETRY; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class PrimaryUpdateAndCacheUtilsTest { + + Context context = mock(Context.class); + KubernetesClient client = mock(KubernetesClient.class); + Resource resource = mock(Resource.class); + + @BeforeEach + void setup() { + when(context.getClient()).thenReturn(client); + var esr = mock(EventSourceRetriever.class); + when(context.eventSourceRetriever()).thenReturn(esr); + when(esr.getControllerEventSource()).thenReturn(mock(ControllerEventSource.class)); + var mixedOp = mock(MixedOperation.class); + when(client.resources(any())).thenReturn(mixedOp); + when(mixedOp.inNamespace(any())).thenReturn(mixedOp); + when(mixedOp.withName(any())).thenReturn(resource); + when(resource.get()).thenReturn(TestUtils.testCustomResource1()); + } + + @Test + void handlesUpdate() { + var updated = + PrimaryUpdateAndCacheUtils.updateAndCacheResource( + TestUtils.testCustomResource1(), + context, + r -> { + var res = TestUtils.testCustomResource1(); + // setting this to null to test if value set in the implementation + res.getMetadata().setResourceVersion(null); + res.getSpec().setValue("updatedValue"); + return res; + }, + r -> { + // checks if the resource version is set from the original resource + assertThat(r.getMetadata().getResourceVersion()).isEqualTo("1"); + var res = TestUtils.testCustomResource1(); + res.setSpec(r.getSpec()); + res.getMetadata().setResourceVersion("2"); + return res; + }); + + assertThat(updated.getMetadata().getResourceVersion()).isEqualTo("2"); + assertThat(updated.getSpec().getValue()).isEqualTo("updatedValue"); + } + + @Test + void retriesConflicts() { + var updateOperation = mock(UnaryOperator.class); + + when(updateOperation.apply(any())) + .thenThrow(new KubernetesClientException("", 409, null)) + .thenReturn(TestUtils.testCustomResource1()); + + var updated = + PrimaryUpdateAndCacheUtils.updateAndCacheResource( + TestUtils.testCustomResource1(), + context, + r -> { + var res = TestUtils.testCustomResource1(); + res.getSpec().setValue("updatedValue"); + return res; + }, + updateOperation); + + assertThat(updated).isNotNull(); + verify(resource, times(1)).get(); + } + + @Test + void throwsIfRetryExhausted() { + var updateOperation = mock(UnaryOperator.class); + + when(updateOperation.apply(any())).thenThrow(new KubernetesClientException("", 409, null)); + + assertThrows( + OperatorException.class, + () -> + PrimaryUpdateAndCacheUtils.updateAndCacheResource( + TestUtils.testCustomResource1(), + context, + UnaryOperator.identity(), + updateOperation)); + verify(resource, times(DEFAULT_MAX_RETRY)).get(); + } +} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCacheTest.java deleted file mode 100644 index 58e3ce8a0a..0000000000 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCacheTest.java +++ /dev/null @@ -1,87 +0,0 @@ -package io.javaoperatorsdk.operator.api.reconciler.support; - -import org.junit.jupiter.api.Test; - -import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; -import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; -import io.javaoperatorsdk.operator.sample.simple.TestCustomResourceSpec; - -import static org.assertj.core.api.Assertions.assertThat; - -class PrimaryResourceCacheTest { - - PrimaryResourceCache versionParsingCache = - new PrimaryResourceCache<>( - new PrimaryResourceCache.ResourceVersionParsingEvictionPredicate<>()); - - @Test - void returnsThePassedValueIfCacheIsEmpty() { - var cr = customResource("1"); - - var res = versionParsingCache.getFreshResource(cr); - - assertThat(cr).isSameAs(res); - } - - @Test - void returnsTheCachedIfNotEvictedAccordingToPredicate() { - var cr = customResource("2"); - - versionParsingCache.cacheResource(cr); - - var res = versionParsingCache.getFreshResource(customResource("1")); - assertThat(cr).isSameAs(res); - } - - @Test - void ifMoreFreshPassedCachedIsEvicted() { - var cr = customResource("2"); - versionParsingCache.cacheResource(cr); - var newCR = customResource("3"); - - var res = versionParsingCache.getFreshResource(newCR); - var resOnOlder = versionParsingCache.getFreshResource(cr); - - assertThat(newCR).isSameAs(res); - assertThat(resOnOlder).isSameAs(cr); - assertThat(newCR).isNotSameAs(cr); - } - - @Test - void cleanupRemovesCachedResources() { - var cr = customResource("2"); - versionParsingCache.cacheResource(cr); - - versionParsingCache.cleanup(customResource("3")); - - var olderCR = customResource("1"); - var res = versionParsingCache.getFreshResource(olderCR); - assertThat(olderCR).isSameAs(res); - } - - @Test - void removesIfNewResourceWithDifferentUid() { - var cr = customResource("2"); - versionParsingCache.cacheResource(cr); - var crWithDifferentUid = customResource("1"); - cr.getMetadata().setUid("otheruid"); - - var res = versionParsingCache.getFreshResource(crWithDifferentUid); - - assertThat(res).isSameAs(crWithDifferentUid); - } - - private TestCustomResource customResource(String resourceVersion) { - var cr = new TestCustomResource(); - cr.setMetadata( - new ObjectMetaBuilder() - .withName("test1") - .withNamespace("default") - .withUid("uid") - .withResourceVersion(resourceVersion) - .build()); - cr.setSpec(new TestCustomResourceSpec()); - cr.getSpec().setKey("key"); - return cr; - } -} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockCustomResource.java similarity index 60% rename from operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheCustomResource.java rename to operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockCustomResource.java index 84b145cac3..8ab742a975 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheCustomResource.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockCustomResource.java @@ -1,4 +1,4 @@ -package io.javaoperatorsdk.operator.baseapi.statuscache.primarycache; +package io.javaoperatorsdk.operator.baseapi.statuscache; import io.fabric8.kubernetes.api.model.Namespaced; import io.fabric8.kubernetes.client.CustomResource; @@ -8,7 +8,7 @@ @Group("sample.javaoperatorsdk") @Version("v1") -@ShortNames("spc") -public class StatusPatchPrimaryCacheCustomResource - extends CustomResource +@ShortNames("spwl") +public class StatusPatchCacheWithLockCustomResource + extends CustomResource implements Namespaced {} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockIT.java similarity index 76% rename from operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheIT.java rename to operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockIT.java index f78511f250..c5752f4aae 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockIT.java @@ -1,4 +1,4 @@ -package io.javaoperatorsdk.operator.baseapi.statuscache.internal; +package io.javaoperatorsdk.operator.baseapi.statuscache; import java.time.Duration; @@ -11,19 +11,19 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -public class StatusPatchCacheIT { +public class StatusPatchCacheWithLockIT { public static final String TEST_1 = "test1"; @RegisterExtension LocallyRunOperatorExtension extension = LocallyRunOperatorExtension.builder() - .withReconciler(StatusPatchCacheReconciler.class) + .withReconciler(StatusPatchCacheWithLockReconciler.class) .build(); @Test void testStatusAlwaysUpToDate() { - var reconciler = extension.getReconcilerOfType(StatusPatchCacheReconciler.class); + var reconciler = extension.getReconcilerOfType(StatusPatchCacheWithLockReconciler.class); extension.create(testResource()); @@ -39,10 +39,10 @@ void testStatusAlwaysUpToDate() { }); } - StatusPatchCacheCustomResource testResource() { - var res = new StatusPatchCacheCustomResource(); + StatusPatchCacheWithLockCustomResource testResource() { + var res = new StatusPatchCacheWithLockCustomResource(); res.setMetadata(new ObjectMetaBuilder().withName(TEST_1).build()); - res.setSpec(new StatusPatchCacheSpec()); + res.setSpec(new StatusPatchCacheWithLockSpec()); return res; } } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockReconciler.java similarity index 60% rename from operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java rename to operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockReconciler.java index 8a3a72a901..364f8e9ff5 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockReconciler.java @@ -1,4 +1,4 @@ -package io.javaoperatorsdk.operator.baseapi.statuscache.internal; +package io.javaoperatorsdk.operator.baseapi.statuscache; import java.util.List; @@ -9,18 +9,19 @@ import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; -import io.javaoperatorsdk.operator.baseapi.statuscache.PeriodicTriggerEventSource; import io.javaoperatorsdk.operator.processing.event.source.EventSource; @ControllerConfiguration -public class StatusPatchCacheReconciler implements Reconciler { +public class StatusPatchCacheWithLockReconciler + implements Reconciler { public volatile int latestValue = 0; public volatile boolean errorPresent = false; @Override - public UpdateControl reconcile( - StatusPatchCacheCustomResource resource, Context context) { + public UpdateControl reconcile( + StatusPatchCacheWithLockCustomResource resource, + Context context) { if (resource.getStatus() != null && resource.getStatus().getValue() != latestValue) { errorPresent = true; @@ -31,33 +32,39 @@ public UpdateControl reconcile( + resource.getStatus().getValue()); } + // test also resource update happening meanwhile reconciliation + resource.getSpec().setCounter(resource.getSpec().getCounter() + 1); + context.getClient().resource(resource).update(); + var freshCopy = createFreshCopy(resource); freshCopy .getStatus() .setValue(resource.getStatus() == null ? 1 : resource.getStatus().getValue() + 1); - var updated = PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(resource, freshCopy, context); + var updated = + PrimaryUpdateAndCacheUtils.ssaPatchStatusAndCacheResource(resource, freshCopy, context); latestValue = updated.getStatus().getValue(); return UpdateControl.noUpdate(); } @Override - public List> prepareEventSources( - EventSourceContext context) { + public List> prepareEventSources( + EventSourceContext context) { // periodic event triggering for testing purposes return List.of(new PeriodicTriggerEventSource<>(context.getPrimaryCache())); } - private StatusPatchCacheCustomResource createFreshCopy(StatusPatchCacheCustomResource resource) { - var res = new StatusPatchCacheCustomResource(); + private StatusPatchCacheWithLockCustomResource createFreshCopy( + StatusPatchCacheWithLockCustomResource resource) { + var res = new StatusPatchCacheWithLockCustomResource(); res.setMetadata( new ObjectMetaBuilder() .withName(resource.getMetadata().getName()) .withNamespace(resource.getMetadata().getNamespace()) .build()); - res.setStatus(new StatusPatchCacheStatus()); + res.setStatus(new StatusPatchCacheWithLockStatus()); return res; } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockSpec.java similarity index 60% rename from operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheSpec.java rename to operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockSpec.java index d1426fd943..ebbabd49a0 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheSpec.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockSpec.java @@ -1,6 +1,6 @@ -package io.javaoperatorsdk.operator.baseapi.statuscache.internal; +package io.javaoperatorsdk.operator.baseapi.statuscache; -public class StatusPatchCacheSpec { +public class StatusPatchCacheWithLockSpec { private int counter = 0; diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockStatus.java new file mode 100644 index 0000000000..5f2d8f5a6f --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/StatusPatchCacheWithLockStatus.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache; + +public class StatusPatchCacheWithLockStatus { + + private Integer value = 0; + + public Integer getValue() { + return value; + } + + public StatusPatchCacheWithLockStatus setValue(Integer value) { + this.value = value; + return this; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheCustomResource.java deleted file mode 100644 index 2a2d8b83fd..0000000000 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheCustomResource.java +++ /dev/null @@ -1,13 +0,0 @@ -package io.javaoperatorsdk.operator.baseapi.statuscache.internal; - -import io.fabric8.kubernetes.api.model.Namespaced; -import io.fabric8.kubernetes.client.CustomResource; -import io.fabric8.kubernetes.model.annotation.Group; -import io.fabric8.kubernetes.model.annotation.ShortNames; -import io.fabric8.kubernetes.model.annotation.Version; - -@Group("sample.javaoperatorsdk") -@Version("v1") -@ShortNames("spcl") -public class StatusPatchCacheCustomResource - extends CustomResource implements Namespaced {} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheStatus.java deleted file mode 100644 index 00bc4b6f04..0000000000 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheStatus.java +++ /dev/null @@ -1,15 +0,0 @@ -package io.javaoperatorsdk.operator.baseapi.statuscache.internal; - -public class StatusPatchCacheStatus { - - private Integer value = 0; - - public Integer getValue() { - return value; - } - - public StatusPatchCacheStatus setValue(Integer value) { - this.value = value; - return this; - } -} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheIT.java deleted file mode 100644 index a884ec0758..0000000000 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheIT.java +++ /dev/null @@ -1,48 +0,0 @@ -package io.javaoperatorsdk.operator.baseapi.statuscache.primarycache; - -import java.time.Duration; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; - -import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; -import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; - -class StatusPatchPrimaryCacheIT { - - public static final String TEST_1 = "test1"; - - @RegisterExtension - LocallyRunOperatorExtension extension = - LocallyRunOperatorExtension.builder() - .withReconciler(StatusPatchPrimaryCacheReconciler.class) - .build(); - - @Test - void testStatusAlwaysUpToDate() { - var reconciler = extension.getReconcilerOfType(StatusPatchPrimaryCacheReconciler.class); - - extension.create(testResource()); - - // the reconciliation is periodically triggered, the status values should be increasing - // monotonically - await() - .pollDelay(Duration.ofSeconds(1)) - .pollInterval(Duration.ofMillis(30)) - .untilAsserted( - () -> { - assertThat(reconciler.errorPresent).isFalse(); - assertThat(reconciler.latestValue).isGreaterThan(10); - }); - } - - StatusPatchPrimaryCacheCustomResource testResource() { - var res = new StatusPatchPrimaryCacheCustomResource(); - res.setMetadata(new ObjectMetaBuilder().withName(TEST_1).build()); - res.setSpec(new StatusPatchPrimaryCacheSpec()); - return res; - } -} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheReconciler.java deleted file mode 100644 index c25fcddfec..0000000000 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheReconciler.java +++ /dev/null @@ -1,89 +0,0 @@ -package io.javaoperatorsdk.operator.baseapi.statuscache.primarycache; - -import java.util.List; - -import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; -import io.javaoperatorsdk.operator.api.reconciler.Cleaner; -import io.javaoperatorsdk.operator.api.reconciler.Context; -import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; -import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils; -import io.javaoperatorsdk.operator.api.reconciler.Reconciler; -import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; -import io.javaoperatorsdk.operator.api.reconciler.support.PrimaryResourceCache; -import io.javaoperatorsdk.operator.baseapi.statuscache.PeriodicTriggerEventSource; -import io.javaoperatorsdk.operator.processing.event.source.EventSource; - -@ControllerConfiguration -public class StatusPatchPrimaryCacheReconciler - implements Reconciler, - Cleaner { - - public volatile int latestValue = 0; - public volatile boolean errorPresent = false; - - // We on purpose don't use the provided predicate to show what a custom one could look like. - private final PrimaryResourceCache cache = - new PrimaryResourceCache<>( - (statusPatchCacheCustomResourcePair, statusPatchCacheCustomResource) -> - statusPatchCacheCustomResource.getStatus().getValue() - >= statusPatchCacheCustomResourcePair.afterUpdate().getStatus().getValue()); - - @Override - public UpdateControl reconcile( - StatusPatchPrimaryCacheCustomResource primary, - Context context) { - - primary = cache.getFreshResource(primary); - - if (primary.getStatus() != null && primary.getStatus().getValue() != latestValue) { - errorPresent = true; - throw new IllegalStateException( - "status is not up to date. Latest value: " - + latestValue - + " status values: " - + primary.getStatus().getValue()); - } - - var freshCopy = createFreshCopy(primary); - freshCopy - .getStatus() - .setValue(primary.getStatus() == null ? 1 : primary.getStatus().getValue() + 1); - - var updated = - PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(primary, freshCopy, context, cache); - latestValue = updated.getStatus().getValue(); - - return UpdateControl.noUpdate(); - } - - @Override - public List> prepareEventSources( - EventSourceContext context) { - // periodic event triggering for testing purposes - return List.of(new PeriodicTriggerEventSource<>(context.getPrimaryCache())); - } - - private StatusPatchPrimaryCacheCustomResource createFreshCopy( - StatusPatchPrimaryCacheCustomResource resource) { - var res = new StatusPatchPrimaryCacheCustomResource(); - res.setMetadata( - new ObjectMetaBuilder() - .withName(resource.getMetadata().getName()) - .withNamespace(resource.getMetadata().getNamespace()) - .build()); - res.setStatus(new StatusPatchPrimaryCacheStatus()); - - return res; - } - - @Override - public DeleteControl cleanup( - StatusPatchPrimaryCacheCustomResource resource, - Context context) - throws Exception { - cache.cleanup(resource); - return DeleteControl.defaultDelete(); - } -} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheSpec.java deleted file mode 100644 index 90630c1ae8..0000000000 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheSpec.java +++ /dev/null @@ -1,15 +0,0 @@ -package io.javaoperatorsdk.operator.baseapi.statuscache.primarycache; - -public class StatusPatchPrimaryCacheSpec { - - private boolean messageInStatus = true; - - public boolean isMessageInStatus() { - return messageInStatus; - } - - public StatusPatchPrimaryCacheSpec setMessageInStatus(boolean messageInStatus) { - this.messageInStatus = messageInStatus; - return this; - } -} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheStatus.java deleted file mode 100644 index 0687d5576a..0000000000 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheStatus.java +++ /dev/null @@ -1,15 +0,0 @@ -package io.javaoperatorsdk.operator.baseapi.statuscache.primarycache; - -public class StatusPatchPrimaryCacheStatus { - - private Integer value = 0; - - public Integer getValue() { - return value; - } - - public StatusPatchPrimaryCacheStatus setValue(Integer value) { - this.value = value; - return this; - } -}