Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.8.0
-
None
Description
In short: I believe that both an internal retry mechanism and guidance from the example code and client API docs are independently capable of causing committed transactions to actually abort, aborted transactions to actually commit, and transactions to be split into multiple parts with different fates. A delayed EndTxn message could arrive seconds later and decide the fate of an unrelated transaction.
Consider the attached Lamport diagram, reconstructed from node logs and packet captures in a recent Jepsen test. In it, a single process, using a single producer and consumer, executes a series of transactions which all commit or abort cleanly. Process 76 selected the unique transactional ID `jt1234` on initialization.
From packet captures and debug logs, we see `jt1234` used producer ID `233`, submitted all four operations, then sent an EndTxn message with `committed = false`, which denotes a transaction abort. However, fifteen separate calls to `poll` observed this transaction's write of `424` to key `5`---an obvious case of aborted read (G1a). Even stranger, no poller observed the other writes from this transaction: key `17` apparently never
received values `926` or `927`. Why?
From the packet capture and logs: process 76 began a transaction which sent `1018` to key `15`. It sent an `EndTxn` message to commit that transaction to node `n3`. However, it did not receive a prompt response. The client then quietly sent a second commit message to `n4`, which returned successfully; the test harness's call to `commitTransaction` completed successfully. The process then performed and intentionally aborted a second transaction; this completed OK. So far, so good.
Then process 76 began our problematic transaction. It sent `424` to key `5`, and added new partitions to the transaction. Just after accepting record `424`, node `n3` received the delayed commit message from two transactions previously. It committed the current transaction, effectively chopping it in half. The first half (record `424`) was committed and visible to pollers. The second half, sending `926` and `927` to key `17`, implicitly began a second transaction, which was aborted by the client.
This suggests a fundamental problem in the Kafka transaction protocol. The protocol is intentionally designed to allow clients to submit requests over multiple TCP connections and to distribute them across multiple nodes. There is no sequence number to order requests from the same client. There is no concept of a transaction number. When a server receives a commit (or abort) message, it has no way to know what transaction the client intended to commit. It simply commits or aborts whatever transaction happens to be in progress.
This means transactions which appeared to commit could actually abort, and vice versa: we observed both aborted reads and lost writes. It also means transactions could get chopped in to smaller pieces: one could lose some, but not all, of a transaction's effects.
What does it take to get this behavior? First, an `EndTxn` message must be delayed---for instance due to network latency, packet loss, a slow computer, garbage collection, etc. Second, while that `EndTxn` arrow is hovering in the air, the client needs to move on to perform a second transaction using the same producer ID and epoch. There are several ways this could happen.
First, users could explicitly retry committing or aborting a transaction. The docs say they can, and the client won't stop them.
Second, the official Kafka Java client docs instruct users repeatedly instruct users to call `abortTransaction` if an error occurs during `commitTransaction`. The provided example code leads directly to this behavior: if `commitTransaction` times out, it calls `abortTransaction`, and violà : the client can move on to later operations. The only exceptions in the docs are `ProducerFencedException`, `OutOfOrderSequenceException`, and `AuthorizationException`, none of which apply here.
I've tried to avoid this problem by ensuring that transactions either commit once, or abort once, never both. Sadly, this doesn't work. Indeed, process 76 in this test run never tried to abort a transaction after calling commit---and even though it only calls `commitTransaction` once, it sent two commit messages to two different nodes. I suspect this is because the Java client treats timeouts as retriable (https://github.com/apache/kafka/blob/8125c3da5bb6ebb35a0cb3494624d33fad4e3187/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java#L22), and the transaction manager appears to perform its own internal retries.
I'm not entirely sure how to fix this--the protocol feels like it's missing critical ordering information, and you can't rely on e.g. TCP ordering, because it's multi-stream. One option might be to force the producer to acquire a new epoch if it ever encounters an indefinite result from an EndTxn message-then the producer fencing mechanism would prevent any delayed EndTxn messages from being processed, right?