Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-2292

Kafka spout enhancement, for our of range edge cases

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 0.10.0
    • None
    • storm-kafka
    • None

    Description

      @hmcl and all, we have communicated via email for a while and going forward let's talk in this thread so everyone is in same page.
      Base on the spout from the community(written by you), we have several fixes and it worked quite stable in our production for about 6 months.

      We want to share the latest spout to you and could you please kindly help review and merge to the community version if any fix is reasonable? we want to avoid diverging too much from the community version.

      Below are our major fixes:

      For failed message, in next tuple method, originally the spout seek back to the non-continuous offset, so the failed message will be polled again for retry, say we seek back to message 10 for retry, now if kafka log file was purged, earliest offset is 1000, it means we will seek to 10 but reset to 1000 as per the reset policy, and we cannot poll the message 10, so spout not work.
      Our fix is: we manually catch the out of range exception, commit the offset to earliest offset first, then seek to the earliest offset

      Currently the way to find next committed offset is very complex, under some edge cases – a), if no message acked back because bolt has some issue or cannot catch up with the spout emit; b) seek back is happened frequently and it is much faster than the message be acked back
      We give each message a status – None, emit, acked, failed(if failed number is bigger than the maximum retry, set to acked)

      One of our use cases need ordering in partition level, so after seek back for retry, we re-emit all the follow messages again no matter they have emitted or not, if possible, maybe you can give an option here to configure it – either re-emit all the message from the failed one, or just emit the failed one, same as current version.

      We record the message count for acked, failed, emitted, just for statistics.

      Could you please kindly help review and let us know if you can merge it into the community version? Any comments/concern pls feel free to let us know. Btw, our code is attached in this Jira.

      Attachments

        1. KafkaSpout.java.txt
          285 kB
          WayneZhou

        Activity

          People

            hmclouro Hugo Da Cruz Louro
            jianbzhou WayneZhou
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:

              Time Tracking

                Estimated:
                Original Estimate - 336h
                336h
                Remaining:
                Remaining Estimate - 336h
                336h
                Logged:
                Time Spent - Not Specified
                Not Specified