As part of the investigation, I will determine what other versions are affected.
In StreamTask, we catch a `ProducerFencedException` and throw a `TaskMigratedException`. However, in this case, the `RecordCollectorImpl` is throwing a `StreamsException`, caused by `KafkaException` caused by `ProducerFencedException`.
In response to a TaskMigratedException, we would rebalance, but when we get a StreamsException, streams shuts itself down.
In other words, we intended to do a rebalance in response to a producer fence, but actually, we shut down (when the fence happens inside the record collector).
Coincidentally, Guozhang noticed and fixed this in a recent PR: https://github.com/apache/kafka/pull/5428/files#diff-4e5612eeba09dabf30d0b8430f269ff6
The scope of this ticket is to extract that fix and associated tests, and send a separate PR to trunk and 2.0, and also to determine what other versions, if any, are affected.