Here is a patch:
1. AbstractFetchThread.addPartition(): call handleOffsetOutOfRange if initialOffset < 0
2. I didnt touch ConsumerFetcherManager.doWork() since addFetcher() is called for partitions with leaders only (which is why 3 is unnecessary).
3. ConsumerFetcherThrad.handleOffsetOutOfRange: check partitionErrorAndOffset.error and throw appropriate exception (which should have been done anyway, I don't think this is necessary for the patch)
3.1 Note: this should probably be done in the ReplicaFetcherThread too?
4. ZookeeperConsumerConnector.ZkRebalanceListener: Do not compute leaderIdForPartitionMap in rebalance() and set PartitionTopicInfo offsets to -1 if not in Zk (new consumer)
5. PartitionTopicInfo: removed brokerId
6. Fixed tests for compilation (I am having a hard time running tests since ./sbt test does not seem to work for me very well)
7. Should we increase the default refresh.leader.backoff.ms ? It's tradeoff between being able to pick fast a new leader to consume (useful when replication is on) and not flooding the broker when there is no leader (or replication is off). 200ms is very short, but something hybrid like "try 5 times at 200ms backoff, then every 5min" would get all use cases.
I am running this on test clusters with a mirrormaker andthe error that I had in my initial test case (in the description) does not occur anymore.