Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-2624

Kafka Storm Spout: Got fetch request with offset out of range

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.0.1, 1.0.2, 1.1.0
    • None
    • storm-kafka
    • None
    • Important

    Description

      If partition offset is out of range then kafka spout stops emitting new messages and keeps logging following warning:
      2016-10-26 11:11:31.070 o.a.s.k.KafkaUtils [WARN] Partition

      {host=somehost.org:9092, topic=my-topic, partition=0}

      Got fetch request with offset out of range: [3]
      2016-10-26 11:11:31.078 o.a.s.k.KafkaUtils [WARN] Partition

      {host=somehost.org:9092, topic=my-topic, partition=0}

      Got fetch request with offset out of range: [3]
      ...

      I believe the trivial fix is in PartitonManager.java in fill method
      line 237:

                  long partitionLatestOffset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.LatestTime());
                  if (partitionLatestOffset < offset) {
                      offset = partitionLatestOffset;
                  } else {
                      offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
                  }
      

      change to:

                  offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, _spoutConfig.startOffsetTime);
      

      line 259:

                  if (offset > _emittedToOffset) {
                      _lostMessageCount.incrBy(offset - _emittedToOffset);
                      _emittedToOffset = offset;
                      LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
                  }
      

      change to:

                  if (offset > _emittedToOffset) {
                      _lostMessageCount.incrBy(offset - _emittedToOffset);
                  }
                  _emittedToOffset = offset;
                  LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            sergiyk Sergiy Kharytesku
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: