Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10829

Kafka Streams handle produce exception improvement

    XMLWordPrintableJSON

Details

    Description

      A summary of some recent discussions on how we should improve on embedded producer exception handling.

      Note that below the basline logic would guarantee that our correctness semantics is not violated; and optimization are on top of the baseline to reduce the user's burden by letting the library auto-handle certain types of exception.

      1) ``Producer.send()`` throw exception directly:

      1.a) baseline (to make sure correctness) logic is to always wrap them as StreamsException, it would cause the thread to shutdown and exception handler triggered. The handler could look into the wrapped exception and decide whether the shutdown thread can be restarted.

      1.b) optimization is to look at the exception, and decide if they can be wrapped as TaskMigratedException instead (e.g. ProducerFenced). This would then be auto-handled by lost-all-tasks and re-join.

      2) ``Producer.send()`` Callback has an exception:

      2.a) baseline is first to check if the exception is instanceof RetriableException.

      If not retriable, pass it to the producer exception handler to decide whether to throw or to continue with record dropped. If decide to throw, always warp it as StreamsException and keep it locally; at the same time do not send more records from the caller. In the next send call, check the remembered exception and throw. It would cause the thread to shutdown and exception handler triggered.

      If the exception is not Retriable, always throw it as a fatal StreamsException.

      2.b) optimization one: if the non-retriable exception can be translated as a TaskMigratedException, then do not wrap it as StreamsException to let the library handle internally.

      2.c) optimization two: if the retriable exception is a timeout exception, then do not pass to the produce exception handler and treat it as TaskMigrated.

      3) ``Producer.XXXTxn`` APIs except ``AbortTxn`` throw exception directly:

      3.a) baseline logic is to capture all KafkaException except TimeoutException, and handle them as TaskCorrupted (which include abort the transaction, reset the state, and re-join the group). TimeoutException would be rethrown.

      3.b) optimization: some exceptions can be handled as TaskMigrated, which would be handled in a lighter way.

      4) ``Producer.abortTxn`` throw exception:

      3.a) baseline logic is to capture all KafkaException except TimeoutException as fatal StreamsException. TimeoutException would be rethrown.

      3.b) optimization: some exceptions can be ignored (e.g. invalidTxnTransition means the abort did not succeeded).

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              guozhang Guozhang Wang
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated: