When partitions are being revoked (client received revocation on heartbeat and is in the process of invoking the callback), we need to make sure we do not fetch from those partitions anymore:
- no new fetches should be sent out for the partitions being revoked
- no fetch responses should be handled for those partitions (case where a fetch was already in-flight when the partition revocation started.
This does not seem to be handled in the current KafkaConsumer and the old consumer protocol (only for the EAGER protocol).
Consider re-using the existing pendingRevocation logic that already exist in the subscriptionState & used from the fetcher to determine if a partition is fetchable.