2.1 Fair enough. I'll move it out of commitOffsets() API
>> Well, in general, rebalance is selective
2.2. There is one case of rebalancing (which you've listed), where not clearing the queue would help. However, the code gets slightly more complicated. But, I will make the changes and include it in the next patch.
>> First of all, it's not clear if the synchronization in the patch gives the exclusive access that you want. This is mainly because the lock is temporarily released in the makeNext() call. This allows other concurrent callers to sneak in. For example, it could be that makeNext() call gets the next chunk in getNextDataChunk, but hasn't updated currentDataChunk yet. At this moment, the fetcher queue is cleared and clearCurrentChunk is called. The latter saves the current offset (which will be used to set the fetch offset after rebalance) and sets currentDataChunk to null. After that, the makeNext() call continues in getNextDataChunk and sets currentDataChunk to a non-null value. This chunk will be fetched again after rebalance and thus introduce duplicates.
3. The purpose of adding the locking is to do our best to safely reduce the number of duplicates served by the consumer iterator. Locking needs to be done in such a way that the lock is always released before entering a potentially blocking operation. The case you've pointed out seems very corner case, at least from the test runs (using
KAFKA-227). For example, over hundreds of iterations of that test, no duplicates were reported. If you try to "fix" this case, you will risk a potential deadlock situation, which we must avoid. Given that, this amount of locking seems reasonable to me.
>> Second, calling commitOffsets inside consumerIterator seems a bit complicated. I am wondering if we can commit offsets outside of consumerIterator.
3. That protects against the duplication of the last data chunk. The best place to move this, out of the consumer iterator, is in ZookeeperConsumerConnector's closeFetchers(), after clearing after clearing the queue and clearing the current iterator. If this change is made, the number of times we write to zookeeper will also reduce
In our offline chat, you mentioned you want to try to refactor the patch to simplify the consumer iterator code, by removing the lock altogether, and only depend on the atomic references. I was wondering if you'd like to give that a try and upload another patch ?