Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Duplicate
-
0.9.2-incubating, 0.9.3
-
None
Description
KafkaUtils repeat fetch messages which offset is out of range.
This happened when failed list(SortedSet<Long> failed) is not empty and some offset in it is OutOfRange.
[FIX]
storm.kafka.PartitionManager.fill():
...
try {
msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
} catch (UpdateOffsetException e) {
_emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig);
LOG.warn("Using new offset: {}", _emittedToOffset);
// fetch failed, so don't update the metrics
//fix bug: remove this offset from failed list when it is OutOfRange
if (had_failed)
return;
}
...
Attachments
Issue Links
- is duplicated by
-
STORM-643 KafkaUtils repeatedly fetches messages whose offset is out of range
- Resolved