Details
-
Bug
-
Status: Resolved
-
Trivial
-
Resolution: Fixed
-
3.18.2, 3.19.0
-
None
-
Novice
Description
In the updateTaskState() method, if the consumer is resumed and it had no previously stored offset details, then it throws a null pointer exception when it tries to retrieve ((OffsetAndMetadata) v).offset() information
private void updateTaskState() { switch (state) { case PAUSE_REQUESTED: LOG.info("Pausing the consumer as a response to a pause request"); consumer.pause(consumer.assignment()); state = State.PAUSED; break; case RESUME_REQUESTED: LOG.info("Resuming the consumer as a response to a resume request"); if (consumer.committed(consumer.assignment()) != null) { consumer.committed(consumer.assignment()).forEach((k, v) -> { final TopicPartition tp = ((TopicPartition) k); LOG.info("Resuming from offset {} for the topic {} with partition {}", ((OffsetAndMetadata) v).offset(), tp.topic(), tp.partition()); consumer.seek(tp, ((OffsetAndMetadata) v).offset()); } ); } consumer.resume(consumer.assignment()); state = State.RUNNING; break; default: break; } } {