Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14799

Source tasks fail if connector attempts to abort empty transaction

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.3.0, 3.4.0, 3.3.1, 3.3.2
    • 3.5.0, 3.4.1, 3.3.3
    • connect
    • None

    Description

      If a source task invokes TransactionContext::abortTransaction while the current transaction is empty, and then returns an empty batch of records from the next (or current) invocation of SourceTask::poll, the task will fail.

      This is because the Connect framework will honor the transaction abort request by invoking KafkaProducer::abortTransaction, but without having first invoked KafkaProducer::beginTransaction (since no records had been received from the task), which leads to an IllegalStateException.

      An example stack trace for this scenario:

      [2023-03-09 10:41:25,053] ERROR [exactlyOnceQuestionMark|task-0] ExactlyOnceWorkerSourceTask{id=exactlyOnceQuestionMark-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:210)
      java.lang.IllegalStateException: TransactionalId exactly-once-source-integration-test-exactlyOnceQuestionMark-0: Invalid transition attempted from state READY to state ABORTING_TRANSACTION
          at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
          at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:967)
          at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginAbort$3(TransactionManager.java:269)
          at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1116)
          at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:266)
          at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:835)
          at org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.abortTransaction(ExactlyOnceWorkerSourceTask.java:495)
          at org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.shouldCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:473)
          at org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$TransactionBoundaryManager.maybeCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:398)
          at org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask.batchDispatched(ExactlyOnceWorkerSourceTask.java:186)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:362)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
          at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
          at java.base/java.lang.Thread.run(Thread.java:829)

       

      As far as a fix goes, we have a few options:

      1. Gracefully handle this case by translating the call to TransactionContext::abortTransaction into a no-op
      2. Throw an exception (probably an IllegalStateException) from TransactionContext::abortTransaction, which may fail the task, but would give it the option to swallow the exception and continue processing if it would like
      3. Forcibly fail the task without giving it the chance to swallow an exception, using a similar strategy to how we fail tasks that request that a transaction be committed and aborted for the same record (see here)

      Attachments

        Issue Links

          Activity

            People

              ChrisEgerton Chris Egerton
              ChrisEgerton Chris Egerton
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: