Description
In the case of the consumer.poll timing out, the KafkaRecordReader could return with no values in the records iterator which then causes the KafkaRecordReader to exit prematurely[1]. To fix this we need to probably better track the start/end offset and that progress vs the null value.
Additionally if we wanted to be smarter we could do some "backoff" for the timeout or just have consumer specify a larger value.