Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-2228

KafkaSpout does not replay properly when a topic maps to multiple streams

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.0.0, 2.0.0, 1.0.1, 1.0.2, 1.1.0, 1.0.3
    • Fix Version/s: 2.0.0, 1.1.0
    • Component/s: storm-kafka-client
    • Labels:
      None

      Description

      In the example.

      KafkaSpoutTopologyMainNamedTopics.java

      The code creates a TuplesBuilder and a KafkaSpoutStreams

      protected KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
          return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
                  new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], TOPICS[1]),
                  new TopicTest2TupleBuilder<String, String>(TOPICS[2]))
                  .build();
      }
      
      protected KafkaSpoutStreams getKafkaSpoutStreams() {
          final Fields outputFields = new Fields("topic", "partition", "offset", "key", "value");
          final Fields outputFields1 = new Fields("topic", "partition", "offset");
          return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0], TOPICS[1]})  // contents of topics test, test1, sent to test_stream
                  .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})  // contents of topic test2 sent to test_stream
                  .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]})  // contents of topic test2 sent to test2_stream
                  .build();
      }
      

      Essentially the code is trying to take TOPICS[0], TOPICS[1], and TOPICS[2] translate them to Fields("topic", "partition", "offset", "key", "value") and output them on STREAMS[0]. Then just for TOPICS[2] they want it to be output as Fields("topic", "partition", "offset") to STREAMS[2]. (Don't know what happened to STREAMS[1])

      There are two issues here. First with how the TupleBuilder and the SpoutStreams are split up, but coupled STREAMS[2] is actually getting the full "topic" "partition" "offset" "key" "value", but this minor. The real issue is that the code uses the same KafkaSpoutMessageId for all the tuples emitted to both STREAMS[1] and STREAMS[2].

      https://git.corp.yahoo.com/storm/storm/blob/5bcbb8d6d700d0d238d23f8f6d3976667aaedab9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L284-L304

      The code, however, is written to assume that it will only ever get one ack/fail for a given KafkaSpoutMessageId. This means that if one of the emitted tuple trees succeed and then the other fails, the failure will not result in anything being replayed! This violates how storm is intended to work.

      I discovered this as a part of STORM-2225, and I am fine with fixing it on STORM-2225 (I would just remove support for that functionality because there are other ways of doing this correctly). But that would not maintain backwards compatibility and I am not sure it would be appropriate for 1.x releases. I really would like to have feedback from others on this.

      I can put something into 1.x where it will throw an exception if acking is enabled and this situation is present, but I don't want to spend the time tying to do reference counting on the number of tuples actually emitted. If someone else wants to do that I would be happy to turn this JIRA over to them.

        Attachments

          Activity

            People

            • Assignee:
              revans2 Robert Joseph Evans
              Reporter:
              revans2 Robert Joseph Evans
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: