When EOS is turned on, Streams did the following steps:
1. InitTxn in task creation.
2. BeginTxn in topology initialization.
3. AbortTxn in clean shutdown.
4. CommitTxn in commit(), which is called in suspend() as well.
Now consider this situation, with two thread (Ta) and (Tb) and one task:
1. originally Ta owns the task, consumer generation is 1.
2. Ta is un-responsive to send heartbeats, and gets kicked out, a new generation 2 is formed with Tb in it. The task is migrated to Tb while Ta does not know.
3. Ta finally calls `consumer.poll` and was aware of the rebalance, it re-joins the group, forming a new generation of 3. And during the rebalance the leader decides to assign the task back to Ta.
4.a) Ta calls onPartitionRevoked on the task, suspending it and call commit. However if there is no data ever sent since `BeginTxn`, this commit call will become a no-op.
4.b) Ta then calls onPartitionAssigned on the task, resuming it, and then calls BeginTxn. Then it was encountered a ProducerFencedException, incorrectly.
The root cause is that, Ta does not trigger InitTxn to claim "I'm the newest for this txnId, and am going to fence everyone else with the same txnId", so it was mistakenly treated as the old client than Tb.
Note that this issue is not common, since we need to encounter a txn that did not send any data at all to make its commitTxn call a no-op, and hence not being fenced earlier on.
One proposal for this issue is to close the producer and recreates a new one in `suspend` after the commitTxn call succeeded and `startNewTxn` is false, so that the new producer will always `initTxn` to fence others.