Details
-
Bug
-
Status: Open
-
Critical
-
Resolution: Unresolved
-
3.8.0
-
None
Description
With the official Kafka Java client, version 3.8.0, the position of consumers after a transaction aborts appears unpredictable. Sometimes the consumer moves on, skipping over the records it polled in the aborted transaction. Sometimes it rewinds to read them again. Sometimes it rewinds further than the most recent transaction.
Since the goal of transactions is to enable "exactly-once semantics", it seems sensible that the consumer should rewind on abort, such that any subsequent transactions would start at the same offsets. Not rewinding leads to data loss, since messages are consumed but their effects are not committed. Rewinding too far is... just weird.
I'm seeing this issue in Jepsen tests of Kafka 3.0.0 and other Kafka-compatible systems. It occurs without faults, and with a single producer and consumer; no other concurrent processes. Here's the producer and consumer config:
Producer config: {"socket.connection.setup.timeout.max.ms" 1000, "transactional.id" "jt1", "bootstrap.servers" "n3:9092", "request.timeout.ms" 3000, "enable.idempotence" true, "max.block.ms" 10000, "value.serializer" "org.apache.kafka.common.serialization.LongSerializer", "retries" 1000, "key.serializer" "org.apache.kafka.common.serialization.LongSerializer", "socket.connection.setup.timeout.ms" 500, "reconnect.backoff.max.ms" 1000, "delivery.timeout.ms" 10000, "acks" "all", "transaction.timeout.ms" 1000}
Consumer config: {"socket.connection.setup.timeout.max.ms" 1000, "bootstrap.servers" "n5:9092", "request.timeout.ms" 10000, "connections.max.idle.ms" 60000, "session.timeout.ms" 6000, "heartbeat.interval.ms" 300, "key.deserializer" "org.apache.kafka.common.serialization.LongDeserializer", "group.id" "jepsen-group", "metadata.max.age.ms" 60000, "auto.offset.reset" "earliest", "isolation.level" "read_committed", "socket.connection.setup.timeout.ms" 500, "value.deserializer" "org.apache.kafka.common.serialization.LongDeserializer", "enable.auto.commit" false, "default.api.timeout.ms" 10000}
Attached is a test run that shows this behavior, as well as a visualization of the reads (polls) and writes (sends) of a single topic-partition.
In this plot, time flows down, and offsets run left to right. Each transaction is a single horizontal line. `w1` denotes a send of value 1, and `r2` denotes a poll of read 2. All operations here are performed by the sole process in the system, which has a single Kafka consumer and a single Kafka client. First, a transaction writes 35 and commits. Second, a transaction reads 35 and aborts. Third, a transaction reads 35 and aborts: the consumer has clearly re-wound to show the same record twice.
Then a transaction writes 37. Immediately thereafter a transaction reads 37 and 38. Unlike before, it did not rewind. This transaction also aborts.
Finally, a transaction writes 39 and 40. Then a transaction reads 39 and 40. This transaction commits! Values 35, 37, and 38 have been lost!
It doesn't seem possible that this is the effect of a consumer rebalance: rebalancing should start off the consumer at the last committed offset, and the last committed offset in this history was actually value 31–it should have picked up at 35, 37, etc. This test uses auto.offset.reset=earliest, so if the commit were somehow missing, it should have rewound to the start of the topic-partition.
What... should Kafka do with respect to consumer offsets when a transaction aborts? And is there any sort of documentation for this? I've been digging into this problem for almost a week–it manifested as write loss in a Jepsen test--and I'm baffled as to how to proceed.