Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-2241

KafkaSpout implementaion

    XMLWordPrintableJSON

Details

    • Question
    • Status: Closed
    • Minor
    • Resolution: Fixed
    • 0.10.0
    • None
    • storm-kafka
    • None

    Description

      Storm ISpout documentaion say 'Storm executes ack, fail, and nextTuple all on the same thread. This means that an implementor of an ISpout does not need to worry about concurrency issues between those methods. However, it also means that an implementor must ensure that nextTuple is non-blocking: otherwise the method could block acks and fails that are pending to be processed.'

      Where as KafkaSpout has below nextTuple() implementation
      @Override
      public void nextTuple() {
      List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
      for (int i = 0; i < managers.size(); i++) {

      try {
      // in case the number of managers decreased
      _currPartitionIndex = _currPartitionIndex % managers.size();
      EmitState state = managers.get(_currPartitionIndex).next(_collector);
      if (state != EmitState.EMITTED_MORE_LEFT)

      { _currPartitionIndex = (_currPartitionIndex + 1) % managers.size(); }

      if (state != EmitState.NO_EMITTED)

      { break; }

      } catch (FailedFetchException e)

      { LOG.warn("Fetch failed", e); _coordinator.refresh(); }

      }

      long now = System.currentTimeMillis();
      if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs)

      { commit(); }

      }

      We are seeing events are getting replayed when there is slower bolt in the topology chain causing duplicate messages.

      Is there any way this can be fixed.

      Attachments

        Activity

          People

            Unassigned Unassigned
            sravand2001 SHRAVANKUMAR DUBBUDU
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: