Skip to content

Commit

Permalink
KAFKA-16448: Handle fatal user exception during processing error (apa…
Browse files Browse the repository at this point in the history
…che#16675)

This PR is part of KIP-1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.

This PR catch the exceptions thrown while handling a processing exception

Co-authored-by: Dabz <[email protected]>
Co-authored-by: loicgreffier <[email protected]>

Reviewers: Bruno Cadonna <[email protected]>, Matthias J. Sax <[email protected]>
  • Loading branch information
sebastienviale authored Jul 31, 2024
1 parent e9d8109 commit 0dc9b9e
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,13 @@ public void process(final Record<KIn, VIn> record) {
internalProcessorContext.currentNode().name(),
internalProcessorContext.taskId());

final ProcessingExceptionHandler.ProcessingHandlerResponse response = processingExceptionHandler
.handle(errorHandlerContext, record, e);
final ProcessingExceptionHandler.ProcessingHandlerResponse response;

try {
response = processingExceptionHandler.handle(errorHandlerContext, record, e);
} catch (final Exception fatalUserException) {
throw new FailedProcessingException(fatalUserException);
}
if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
log.error("Processing exception handler is set to fail upon" +
" a processing error. If you would rather have the streaming pipeline" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -149,9 +151,150 @@ public void shouldContinueWhenProcessingExceptionOccurs() {
}
}

@Test
public void shouldStopOnFailedProcessorWhenProcessingExceptionOccursInFailProcessingExceptionHandler() {
final KeyValue<String, String> event = new KeyValue<>("ID123-1", "ID123-A1");
final KeyValue<String, String> eventError = new KeyValue<>("ID123-2-ERR", "ID123-A2");

final MockProcessorSupplier<String, String, Void, Void> processor = new MockProcessorSupplier<>();
final StreamsBuilder builder = new StreamsBuilder();
final AtomicBoolean isExecuted = new AtomicBoolean(false);
builder
.stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String()))
.map(KeyValue::new)
.mapValues(value -> value)
.process(runtimeErrorProcessorSupplierMock())
.map((k, v) -> {
isExecuted.set(true);
return KeyValue.pair(k, v);
})
.process(processor);

final Properties properties = new Properties();
properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, FailProcessingExceptionHandlerMockTest.class);

try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
final TestInputTopic<String, String> inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer());
isExecuted.set(false);
inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
assertTrue(isExecuted.get());
isExecuted.set(false);
final StreamsException e = assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
assertTrue(e.getMessage().contains("Exception caught in process. "
+ "taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=TOPIC_NAME, "
+ "partition=0, offset=1, stacktrace=java.lang.RuntimeException: "
+ "Exception should be handled by processing exception handler"));
assertFalse(isExecuted.get());
}
}

@Test
public void shouldStopOnFailedProcessorWhenProcessingExceptionOccursInContinueProcessingExceptionHandler() {
final KeyValue<String, String> event = new KeyValue<>("ID123-1", "ID123-A1");
final KeyValue<String, String> eventFalse = new KeyValue<>("ID123-2-ERR", "ID123-A2");

final MockProcessorSupplier<String, String, Void, Void> processor = new MockProcessorSupplier<>();
final StreamsBuilder builder = new StreamsBuilder();
final AtomicBoolean isExecuted = new AtomicBoolean(false);
builder
.stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String()))
.map(KeyValue::new)
.mapValues(value -> value)
.process(runtimeErrorProcessorSupplierMock())
.map((k, v) -> {
isExecuted.set(true);
return KeyValue.pair(k, v);
})
.process(processor);

final Properties properties = new Properties();
properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ContinueProcessingExceptionHandlerMockTest.class);

try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
final TestInputTopic<String, String> inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer());
isExecuted.set(false);
inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
assertTrue(isExecuted.get());
isExecuted.set(false);
inputTopic.pipeInput(eventFalse.key, eventFalse.value, Instant.EPOCH);
assertFalse(isExecuted.get());
}
}

@Test
public void shouldStopProcessingWhenFatalUserExceptionInFailProcessingExceptionHandler() {
final KeyValue<String, String> event = new KeyValue<>("ID123-1", "ID123-A1");
final KeyValue<String, String> eventError = new KeyValue<>("ID123-ERR-FATAL", "ID123-A2");

final MockProcessorSupplier<String, String, Void, Void> processor = new MockProcessorSupplier<>();
final StreamsBuilder builder = new StreamsBuilder();
final AtomicBoolean isExecuted = new AtomicBoolean(false);
builder
.stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String()))
.map(KeyValue::new)
.mapValues(value -> value)
.process(runtimeErrorProcessorSupplierMock())
.map((k, v) -> {
isExecuted.set(true);
return KeyValue.pair(k, v);
})
.process(processor);

final Properties properties = new Properties();
properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, FailProcessingExceptionHandlerMockTest.class);

try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
final TestInputTopic<String, String> inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer());
isExecuted.set(false);
inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
assertTrue(isExecuted.get());
isExecuted.set(false);
final StreamsException e = assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
assertEquals("KABOOM!", e.getCause().getMessage());
assertFalse(isExecuted.get());
}
}

@Test
public void shouldStopProcessingWhenFatalUserExceptionInContinueProcessingExceptionHandler() {
final KeyValue<String, String> event = new KeyValue<>("ID123-1", "ID123-A1");
final KeyValue<String, String> eventError = new KeyValue<>("ID123-ERR-FATAL", "ID123-A2");

final MockProcessorSupplier<String, String, Void, Void> processor = new MockProcessorSupplier<>();
final StreamsBuilder builder = new StreamsBuilder();
final AtomicBoolean isExecuted = new AtomicBoolean(false);
builder
.stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String()))
.map(KeyValue::new)
.mapValues(value -> value)
.process(runtimeErrorProcessorSupplierMock())
.map((k, v) -> {
isExecuted.set(true);
return KeyValue.pair(k, v);
})
.process(processor);

final Properties properties = new Properties();
properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ContinueProcessingExceptionHandlerMockTest.class);

try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
final TestInputTopic<String, String> inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer());
isExecuted.set(false);
inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
assertTrue(isExecuted.get());
isExecuted.set(false);
final StreamsException e = assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
assertEquals("KABOOM!", e.getCause().getMessage());
assertFalse(isExecuted.get());
}
}

public static class ContinueProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler {
@Override
public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
if (((String) record.key()).contains("FATAL")) {
throw new RuntimeException("KABOOM!");
}
assertProcessingExceptionHandlerInputs(context, record, exception);
return ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE;
}
Expand All @@ -165,6 +308,9 @@ public void configure(final Map<String, ?> configs) {
public static class FailProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler {
@Override
public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
if (((String) record.key()).contains("FATAL")) {
throw new RuntimeException("KABOOM!");
}
assertProcessingExceptionHandlerInputs(context, record, exception);
return ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -100,7 +101,7 @@ public void shouldThrowFailedProcessingExceptionWhenProcessingExceptionHandlerRe
new ProcessorNode<>(NAME, new IgnoredInternalExceptionsProcessor(), Collections.emptySet());

final InternalProcessorContext<Object, Object> internalProcessorContext = mockInternalProcessorContext();
node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL, internalProcessorContext));
node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL, internalProcessorContext, false));

final FailedProcessingException failedProcessingException = assertThrows(FailedProcessingException.class,
() -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
Expand All @@ -116,7 +117,7 @@ public void shouldNotThrowFailedProcessingExceptionWhenProcessingExceptionHandle
new ProcessorNode<>(NAME, new IgnoredInternalExceptionsProcessor(), Collections.emptySet());

final InternalProcessorContext<Object, Object> internalProcessorContext = mockInternalProcessorContext();
node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE, internalProcessorContext));
node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE, internalProcessorContext, false));

assertDoesNotThrow(() -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
}
Expand Down Expand Up @@ -146,6 +147,21 @@ public void shouldNotHandleInternalExceptionsThrownDuringProcessing(final String
verify(processingExceptionHandler, never()).handle(any(), any(), any());
}

@Test
public void shouldThrowFailedProcessingExceptionWhenProcessingExceptionHandlerThrowsAnException() {
final ProcessorNode<Object, Object, Object, Object> node =
new ProcessorNode<>(NAME, new IgnoredInternalExceptionsProcessor(), Collections.emptySet());

final InternalProcessorContext<Object, Object> internalProcessorContext = mockInternalProcessorContext();
node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE, internalProcessorContext, true));

final FailedProcessingException failedProcessingException = assertThrows(FailedProcessingException.class,
() -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));

assertInstanceOf(RuntimeException.class, failedProcessingException.getCause());
assertEquals("KABOOM!", failedProcessingException.getCause().getMessage());
}

private static class ExceptionalProcessor implements Processor<Object, Object, Object, Object> {
@Override
public void init(final ProcessorContext<Object, Object> context) {
Expand Down Expand Up @@ -323,10 +339,14 @@ public static class ProcessingExceptionHandlerMock implements ProcessingExceptio
private final ProcessingExceptionHandler.ProcessingHandlerResponse response;
private final InternalProcessorContext<Object, Object> internalProcessorContext;

private final boolean shouldThrowException;

public ProcessingExceptionHandlerMock(final ProcessingExceptionHandler.ProcessingHandlerResponse response,
final InternalProcessorContext<Object, Object> internalProcessorContext) {
final InternalProcessorContext<Object, Object> internalProcessorContext,
final boolean shouldThrowException) {
this.response = response;
this.internalProcessorContext = internalProcessorContext;
this.shouldThrowException = shouldThrowException;
}

@Override
Expand All @@ -338,9 +358,12 @@ public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHa
assertEquals(internalProcessorContext.taskId(), context.taskId());
assertEquals(KEY, record.key());
assertEquals(VALUE, record.value());
assertTrue(exception instanceof RuntimeException);
assertInstanceOf(RuntimeException.class, exception);
assertEquals("Processing exception should be caught and handled by the processing exception handler.", exception.getMessage());

if (shouldThrowException) {
throw new RuntimeException("KABOOM!");
}
return response;
}

Expand Down

0 comments on commit 0dc9b9e

Please sign in to comment.