Skip to content

Commit

Permalink
spring-projectsGH-1638: Add RetryListener
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell authored and artembilan committed Jan 26, 2021
1 parent 01e8572 commit a8a4241
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,16 @@ public void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange)
this.failureTracker.setResetStateOnExceptionChange(resetStateOnExceptionChange);
}

/**
* Set one or more {@link RetryListener} to receive notifications of retries and
* recovery.
* @param listeners the listeners.
* @since 2.7
*/
public void setRetryListeners(RetryListener... listeners) {
this.failureTracker.setRetryListeners(listeners);
}

@Override
public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
return this.failureTracker.deliveryAttempt(topicPartitionOffset);
Expand Down Expand Up @@ -244,11 +254,14 @@ protected RecoveryStrategy getRecoveryStrategy(List<ConsumerRecord<?, ?>> record
else {
try {
this.failureTracker.getRecoverer().accept(records.get(0), thrownException);
this.failureTracker.getRetryListeners().forEach(rl -> rl.recovered(records.get(0), thrownException));
}
catch (Exception ex) {
if (records.size() > 0) {
this.logger.error(ex, () -> "Recovery of record ("
+ ListenerUtils.recordToString(records.get(0)) + ") failed");
this.failureTracker.getRetryListeners().forEach(rl ->
rl.recoveryFailed(records.get(0), thrownException, ex));
}
return (rec, excep, cont) -> NEVER_SKIP_PREDICATE.test(rec, excep);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package org.springframework.kafka.listener;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -47,6 +50,8 @@ class FailedRecordTracker implements RecoveryStrategy {

private final boolean noRetries;

private final List<RetryListener> retryListeners = new ArrayList<>();

private final BackOff backOff;

private BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> backOffFunction;
Expand Down Expand Up @@ -114,6 +119,21 @@ public void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange)
this.resetStateOnExceptionChange = resetStateOnExceptionChange;
}

/**
* Set one or more {@link RetryListener} to receive notifications of retries and
* recovery.
* @param listeners the listeners.
* @since 2.7
*/
public void setRetryListeners(RetryListener... listeners) {
this.retryListeners.clear();
this.retryListeners.addAll(Arrays.asList(listeners));
}

List<RetryListener> getRetryListeners() {
return this.retryListeners;
}

boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
try {
return recovered(record, exception, null);
Expand All @@ -139,6 +159,8 @@ public boolean recovered(ConsumerRecord<?, ?> record, Exception exception,
}
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
FailedRecord failedRecord = getFailedRecordInstance(record, exception, map, topicPartition);
this.retryListeners.forEach(rl ->
rl.failedDelivery(record, exception, failedRecord.getDeliveryAttempts().get()));
long nextBackOff = failedRecord.getBackOffExecution().nextBackOff();
if (nextBackOff != BackOffExecution.STOP) {
if (container == null) {
Expand Down Expand Up @@ -194,8 +216,10 @@ private BackOff determineBackOff(ConsumerRecord<?, ?> record, Exception exceptio
private void attemptRecovery(ConsumerRecord<?, ?> record, Exception exception, @Nullable TopicPartition tp) {
try {
this.recoverer.accept(record, exception);
this.retryListeners.forEach(rl -> rl.recovered(record, exception));
}
catch (RuntimeException e) {
this.retryListeners.forEach(rl -> rl.recoveryFailed(record, exception, e));
if (tp != null && this.resetStateOnRecoveryFailure) {
this.failures.get().remove(tp);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
* A listener for retry activity.
*
* @author Gary Russell
* @since 2.7
*
*/
@FunctionalInterface
public interface RetryListener {

/**
* Called after a delivery failed for a record.
* @param record the failed record.
* @param ex the exception.
* @param deliveryAttempt the delivery attempt.
*/
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

/**
* Called after a failing record was successfully recovered.
* @param record the record.
* @param ex the exception.
*/
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}

/**
* Called after a recovery attempt failed.
* @param record the record.
* @param original the original exception causing the recovery attempt.
* @param failure the exception thrown by the recoverer.
*/
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 the original author or authors.
* Copyright 2020-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -113,7 +113,9 @@ void seekAndRecover() throws Exception {
"foo", "bar", "baz", "qux", "fiz", "buz",
"baz", "qux", "fiz", "buz",
"qux", "fiz", "buz");
assertThat(config.recovered.value()).isEqualTo("baz");
assertThat(this.config.recovered.value()).isEqualTo("baz");
assertThat(this.config.listenerFailed.value()).isEqualTo("baz");
assertThat(this.config.listenerRecovered.value()).isEqualTo("baz");
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand Down Expand Up @@ -200,6 +202,10 @@ public static class Config {

volatile ConsumerRecord<?, ?> recovered;

volatile ConsumerRecord<?, ?> listenerFailed;

volatile ConsumerRecord<?, ?> listenerRecovered;

@KafkaListener(id = CONTAINER_ID, topics = "foo")
public void foo(List<String> in) {
received.addAll(in);
Expand Down Expand Up @@ -279,8 +285,22 @@ public Consumer consumer() {
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchErrorHandler(new RecoveringBatchErrorHandler((cr, ex) -> this.recovered = cr,
new FixedBackOff(0, 1)));
RecoveringBatchErrorHandler errorHandler = new RecoveringBatchErrorHandler((cr, ex) -> this.recovered = cr,
new FixedBackOff(0, 1));
errorHandler.setRetryListeners(new RetryListener() {

@Override
public void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt) {
Config.this.listenerFailed = record;
}

@Override
public void recovered(ConsumerRecord<?, ?> record, Exception ex) {
Config.this.listenerRecovered = record;
}

});
factory.setBatchErrorHandler(errorHandler);
factory.setBatchListener(true);
factory.getContainerProperties().setSubBatchPerPartition(false);
factory.setMissingTopicsFatal(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.kafka.clients.consumer.Consumer;
Expand Down Expand Up @@ -60,6 +61,27 @@ public void testClassifier() {
}
recovered.set(r);
});
AtomicInteger failedDeliveryAttempt = new AtomicInteger();
AtomicReference<Exception> recoveryFailureEx = new AtomicReference<>();
AtomicBoolean isRecovered = new AtomicBoolean();
handler.setRetryListeners(new RetryListener() {

@Override
public void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt) {
failedDeliveryAttempt.set(deliveryAttempt);
}

@Override
public void recovered(ConsumerRecord<?, ?> record, Exception ex) {
isRecovered.set(true);
}

@Override
public void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
recoveryFailureEx.set(failure);
}

});
ConsumerRecord<String, String> record1 = new ConsumerRecord<>("foo", 0, 0L, "foo", "bar");
ConsumerRecord<String, String> record2 = new ConsumerRecord<>("foo", 1, 1L, "foo", "bar");
List<ConsumerRecord<?, ?>> records = Arrays.asList(record1, record2);
Expand Down Expand Up @@ -88,6 +110,12 @@ consumer, mock(MessageListenerContainer.class)))
inOrder.verify(consumer).seek(new TopicPartition("foo", 0), 0L); // recovery failed
inOrder.verify(consumer, times(2)).seek(new TopicPartition("foo", 1), 1L);
inOrder.verifyNoMoreInteractions();
assertThat(failedDeliveryAttempt.get()).isEqualTo(1);
assertThat(recoveryFailureEx.get())
.isInstanceOf(RuntimeException.class)
.extracting(ex -> ex.getMessage())
.isEqualTo("test recoverer failure");
assertThat(isRecovered.get()).isTrue();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2020 the original author or authors.
* Copyright 2018-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

Expand Down Expand Up @@ -180,13 +181,8 @@ public void seekToCurrentErrorHandlerRecovers() {
records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo"));
records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar"));
Consumer<?, ?> consumer = mock(Consumer.class);
try {
eh.handle(new RuntimeException(), records, consumer, null);
fail("Expected exception");
}
catch (@SuppressWarnings("unused") KafkaException e) {
// NOSONAR
}
assertThatExceptionOfType(KafkaException.class).isThrownBy(() ->
eh.handle(new RuntimeException(), records, consumer, null));
verify(consumer).seek(new TopicPartition("foo", 0), 0L);
verifyNoMoreInteractions(consumer);
eh.handle(new RuntimeException(), records, consumer, null);
Expand All @@ -207,6 +203,27 @@ public void seekToCurrentErrorHandlerRecovererFailsBackOffReset() {
return null;
}).given(recoverer).accept(any(), any());
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 1));
AtomicInteger failedDeliveryAttempt = new AtomicInteger();
AtomicReference<Exception> recoveryFailureEx = new AtomicReference<>();
AtomicBoolean isRecovered = new AtomicBoolean();
eh.setRetryListeners(new RetryListener() {

@Override
public void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt) {
failedDeliveryAttempt.set(deliveryAttempt);
}

@Override
public void recovered(ConsumerRecord<?, ?> record, Exception ex) {
isRecovered.set(true);
}

@Override
public void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
recoveryFailureEx.set(failure);
}

});
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo"));
records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar"));
Expand All @@ -226,6 +243,12 @@ public void seekToCurrentErrorHandlerRecovererFailsBackOffReset() {
verify(consumer).seek(new TopicPartition("foo", 0), 1L);
verifyNoMoreInteractions(consumer);
verify(recoverer, times(2)).accept(eq(records.get(0)), any());
assertThat(failedDeliveryAttempt.get()).isEqualTo(2);
assertThat(recoveryFailureEx.get())
.isInstanceOf(RuntimeException.class)
.extracting(ex -> ex.getMessage())
.isEqualTo("recovery failed");
assertThat(isRecovered.get()).isTrue();
}

@Test
Expand Down
Loading

0 comments on commit a8a4241

Please sign in to comment.