There is a bug that occurs when failed tuples are invalidated due to Kafka throwing a TopicOffsetOutOfRangeException.
Below is what happens:
- Spout emits tuples
- Offsets are added to the pending tree
- Some tuples fail and are added to the failedMsgRetryManager
- On the next fetch request, a TopicOffsetOutOfRangeException is thrown and the new offset is after the offset that are currently sitting in both the pending tree and the failedMsgRetryManager
- All offsets smaller than the the new offset are removed from the failedMsgRetryManager but not the pending tree.
- Since those offsets were removed from the failedMsgRetryManager they will never be retried and thus never get removed from pending
- lastCommittedOffset() will always return the same value which means that offset in zookeeper for that partition will never get updated.