Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-3818

Joining more than 2 streams in Stream API

    XMLWordPrintableJSON

Details

    Description

      Joining two streams in storms Stream API works fine as described in the docs with the scheme: streamA.window(...).join(streamB)

      However, when joining the result with a third join, a `punctuation error` is thrown.

      So there is a way missing of joining three streams? I think, that one window configuration is enough. So the pseudocode would look like: streamA.window(...).join(streamB).join(streamC). However, this raises a punctuation error.

      Here is the error log:

      Caused by: java.lang.IllegalStateException: Received punctuation from streams [s6] expected [s7]
          at org.apache.storm.streams.ProcessorBoltDelegate.shouldPunctuate(ProcessorBoltDelegate.java:287) ~[classes/:?]
          at org.apache.storm.streams.ProcessorBoltDelegate.punctuateInitialProcessors(ProcessorBoltDelegate.java:189) ~[classes/:?]
          at org.apache.storm.streams.ProcessorBoltDelegate.process(ProcessorBoltDelegate.java:179) ~[classes/:?]
          at org.apache.storm.streams.WindowedProcessorBolt.execute(WindowedProcessorBolt.java:68) ~[classes/:?]
          at org.apache.storm.topology.WindowedBoltExecutor.boltExecute(WindowedBoltExecutor.java:371) ~[classes/:?]
          at org.apache.storm.topology.WindowedBoltExecutor$1.onActivation(WindowedBoltExecutor.java:364) ~[classes/:?]
          at org.apache.storm.windowing.WindowManager.onTrigger(WindowManager.java:156) ~[classes/:?]
          at org.apache.storm.windowing.TimeTriggerPolicy$1.run(TimeTriggerPolicy.java:119) ~[classes/:?]
          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
          at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?]
          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?]
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
          ... 1 more 

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            Heinrich Roman
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: