@@ -1511,31 +1511,32 @@ Serde for the inner class of a windowed record. Must implement the `Serde` inter
15111511> ``` java
15121512> public class DlqProcessingExceptionHandler implements ProcessingExceptionHandler {
15131513>
1514- > private String deadLetterQueueTopic;
1514+ > private String deadLetterQueueTopic;
15151515>
1516- > @Override
1517- > public Response handleError (final ErrorHandlerContext context ,
1518- > final Record<?, ?> record ,
1519- > final Exception exception ) {
1516+ > @Override
1517+ > public Response handleError (final ErrorHandlerContext context ,
1518+ > final Record<?, ?> record ,
1519+ > final Exception exception ) {
15201520>
1521- > // Example: forward the raw record to a DLQ topic
1522- > Record<byte[], byte[]> dlqRecord =
1523- > new Record<> (deadLetterQueueTopic,
1524- > context. sourceRawKey(),
1525- > context. sourceRawValue(),
1526- > context. timestamp());
1521+ > // Example: forward the raw record to a DLQ topic
1522+ > ProducerRecord<byte[], byte[]> dlqRecord =
1523+ > new ProducerRecord<> (deadLetterQueueTopic,
1524+ > null ,
1525+ > context. timestamp(),
1526+ > context. sourceRawKey(),
1527+ > context. sourceRawValue());
15271528>
15281529> // Applications may choose how to construct DLQ records. For example,
15291530> // they may forward the raw key/value bytes, transform the payload,
15301531> // or add headers with error metadata.
15311532> return Response . resume(List . of(dlqRecord));
1532- > }
1533+ > }
15331534>
1534- > @Override
1535- > public void configure (final Map<String , ?> configs ) {
1535+ > @Override
1536+ > public void configure (final Map<String , ?> configs ) {
15361537> // Retrieve the DLQ topic name from the configs map, or any other source
1537- > deadLetterQueueTopic = (String ) configs. get(" my.dlq.topic.config.key" );
1538- > }
1538+ > deadLetterQueueTopic = (String ) configs. get(" my.dlq.topic.config.key" );
1539+ > }
15391540> }
15401541> ```
15411542> To enable the custom exception handler and configure the DLQ topic:
@@ -1544,15 +1545,15 @@ Serde for the inner class of a windowed record. Must implement the `Serde` inter
15441545> Properties props = new Properties ();
15451546>
15461547> props. put(
1547- > StreamsConfig . PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG ,
1548- > DlqProcessingExceptionHandler . class
1548+ > StreamsConfig . PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG ,
1549+ > DlqProcessingExceptionHandler . class
15491550> );
15501551>
15511552> // Optional: if your custom handler reads the DLQ topic from StreamsConfig,
15521553> // set it here. Otherwise, configure the topic name via your own properties.
15531554> // props.put(
1554- > // StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
1555- > // "dlq-topic"
1555+ > // StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
1556+ > // "dlq-topic"
15561557> // );
15571558> ```
15581559### processing. exception. handler. global. enabled (deprecated)
@@ -2051,5 +2052,3 @@ Admin
20512052 * [Documentation ](/ documentation)
20522053 * [Kafka Streams ](/ documentation/ streams)
20532054 * [Developer Guide ](/ documentation/ streams/ developer- guide/ )
2054-
2055-
0 commit comments