      I've come across an issue with exactly-once processing (running kafka 1.1.0):
      In my test I bring up 3 brokers, and I start sending messages to `topicX`.
      While I'm sending messages, I bring up a few consumers on `topicX` one at a time (all with the same group id) - and they produce the same message to `topicY`. At some point I bring one broker down and up again, to check resiliency to failures.

      Eventually I assert that `topicY` contains exactly the messages sent to `topicX`.
      This usually works as expected, but when running the same test 1000s of times to check for flakiness, some of them act as follows (in this order):
      1. Consumer `C1` owns partition `p`.
      1a. Consumers rebalance occurs (because one of the new consumers is starting).
      1b. Consumer `C1` is revoked and then re-assigned partition `p`.
      2. One of the 3 brokers starts controlled shutdown.
      3. Consumer `C1` uses a transactional producer to send a message on offset `o`.
      4. Consumer `C1` sends offset `o+1` for partition `p` to the transaction.
      5. Consumer `C1` successfully commits the message.
      6. Broker controlled shutdown finishes successfully.
      ... a few seconds after...
      7. Another consumer rebalance occurs, Consumer `C2` is assigned to partition `p`.
      8. Consumer `C2` polls message on offset `o` for partition `p`.

      This means we do double processing for the message on offset `o`, violating exactly-once semantics.

      So it looks like during broker restart, a commit to the transactional producer gets lost - and because we rebalance after that before another commit happened, we actually poll the same message again, although previously committed.

      The brokers are configured with:

      The consumer is configured with

      The original producer to `topicX` has transactional semantics, and the test shows that it didn't send double messages (using idempodent producer config).




