Details
-
Question
-
Status: Closed
-
Minor
-
Resolution: Fixed
-
0.10.0
-
None
-
None
Description
Storm ISpout documentaion say 'Storm executes ack, fail, and nextTuple all on the same thread. This means that an implementor of an ISpout does not need to worry about concurrency issues between those methods. However, it also means that an implementor must ensure that nextTuple is non-blocking: otherwise the method could block acks and fails that are pending to be processed.'
Where as KafkaSpout has below nextTuple() implementation
@Override
public void nextTuple() {
List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i < managers.size(); i++) {
try {
// in case the number of managers decreased
_currPartitionIndex = _currPartitionIndex % managers.size();
EmitState state = managers.get(_currPartitionIndex).next(_collector);
if (state != EmitState.EMITTED_MORE_LEFT)
if (state != EmitState.NO_EMITTED)
{ break; }} catch (FailedFetchException e)
{ LOG.warn("Fetch failed", e); _coordinator.refresh(); }}
long now = System.currentTimeMillis();
if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs)
}
We are seeing events are getting replayed when there is slower bolt in the topology chain causing duplicate messages.
Is there any way this can be fixed.