Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.0.1, 1.0.2, 1.1.0
-
None
-
None
-
Important
Description
If partition offset is out of range then kafka spout stops emitting new messages and keeps logging following warning:
2016-10-26 11:11:31.070 o.a.s.k.KafkaUtils [WARN] Partition
Got fetch request with offset out of range: [3]
2016-10-26 11:11:31.078 o.a.s.k.KafkaUtils [WARN] Partition
Got fetch request with offset out of range: [3]
...
I believe the trivial fix is in PartitonManager.java in fill method
line 237:
long partitionLatestOffset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.LatestTime()); if (partitionLatestOffset < offset) { offset = partitionLatestOffset; } else { offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime()); }
change to:
offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, _spoutConfig.startOffsetTime);
line 259:
if (offset > _emittedToOffset) { _lostMessageCount.incrBy(offset - _emittedToOffset); _emittedToOffset = offset; LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset); }
change to:
if (offset > _emittedToOffset) { _lostMessageCount.incrBy(offset - _emittedToOffset); } _emittedToOffset = offset; LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);