Details
Description
If a source task invokes TransactionContext::abortTransaction while the current transaction is empty, and then returns an empty batch of records from the next (or current) invocation of SourceTask::poll, the task will fail.
This is because the Connect framework will honor the transaction abort request by invoking KafkaProducer::abortTransaction, but without having first invoked KafkaProducer::beginTransaction (since no records had been received from the task), which leads to an IllegalStateException.
An example stack trace for this scenario:
[2023-03-09 10:41:25,053] ERROR [exactlyOnceQuestionMark|task-0] ExactlyOnceWorkerSourceTask{id=exactlyOnceQuestionMark-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:210)
java.lang.IllegalStateException: TransactionalId exactly-once-source-integration-test-exactlyOnceQuestionMark-0: Invalid transition attempted from state READY to state ABORTING_TRANSACTION
at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:967)
at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginAbort$3(TransactionManager.java:269)
at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1116)
at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:266)
at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:835)
at org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.abortTransaction(ExactlyOnceWorkerSourceTask.java:495)
at org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.shouldCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:473)
at org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$TransactionBoundaryManager.maybeCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:398)
at org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask.batchDispatched(ExactlyOnceWorkerSourceTask.java:186)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:362)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
As far as a fix goes, we have a few options:
- Gracefully handle this case by translating the call to TransactionContext::abortTransaction into a no-op
- Throw an exception (probably an IllegalStateException) from TransactionContext::abortTransaction, which may fail the task, but would give it the option to swallow the exception and continue processing if it would like
- Forcibly fail the task without giving it the chance to swallow an exception, using a similar strategy to how we fail tasks that request that a transaction be committed and aborted for the same record (see here)
Attachments
Issue Links
- links to