Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-3611

IllegalArgumentException thrown on windowed bolt

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Not A Problem
    • Affects Version/s: 2.1.0
    • Fix Version/s: None
    • Component/s: storm-client
    • Labels:
      None
    • Environment:
      Ubuntu Linux 16.04, OpenJDK version 1.8.0_242

      Description

      The following topology with a single `BasicWindowedBolt`. The following topology:

      public class GroupMean extends ConfigurableTopology {
        
        public static void main(String[] args) throws Exception {
          ConfigurableTopology.start(new GroupMean(), args);
        }
        
        @Override
        protected int run(String[] args) throws Exception {
          BaseWindowedBolt bolt = new GroupMeanBolt()
              .withTimestampExtractor(x -> x.getLong(0))
              .withTumblingWindow(BaseWindowedBolt.Duration.of(100));
          
          TopologyBuilder builder = new TopologyBuilder();
          builder.setSpout("groups", new RandomGroupNumberSpout(), 1);
          builder.setBolt("mean", bolt, 1).shuffleGrouping("groups");
         
          Config config = new Config();
          config.put("topology.name", "group-mean-topo");
          return submit("group-mean-topo", config, builder);
        }
      }
      

       with the GroupMeanBolt being like:

      public class GroupMeanBolt extends BaseWindowedBolt {
        private OutputCollector collector;
        
        @Override
        public void prepare(Map<String, Object> conf, TopologyContext context,
                            OutputCollector collector) {
          this.collector = collector;
        }
        @Override
        public void execute(TupleWindow inputWindow) {
          if (inputWindow.get().size() > 0) {
            Map<Integer, Double> sum = new HashMap<>();
            Map<Integer, Integer> histogram = new HashMap<>();
            for (Tuple t : inputWindow.get()) {
              Integer key = t.getIntegerByField("key");
              Double value = t.getDoubleByField("value");
              sum.compute(key, (k, v) -> v == null ? value : v + value);
              histogram.compute(key, (k, v) -> v == null ? 1 : v + 1);
            }
            Map<Integer, Number> result = new HashMap<>(sum.size());
            for (Map.Entry<Integer, Double> e : sum.entrySet()) {
              result.put(e.getKey(), e.getValue() / histogram.get(e.getKey()));
            }
            ArrayList<Object> out = new ArrayList<>(3);
            out.add(inputWindow.getStartTimestamp());
            out.add(inputWindow.getEndTimestamp());
            ArrayList<String> tokens = new ArrayList<>(result.size());
            for (Map.Entry<Integer, Number> e : result.entrySet()) {
              String token = String.join(":", e.getKey().toString(), e.getValue().toString());
              tokens.add(token);
            }
            out.add(String.join(",", tokens));
            collector.emit(out);
          }
        }
      }
      

      At runtime, an IllegalArgumentException is thrown because the "default" stream is not recognized. Stack trace:

      2020-03-27 15:42:42.705 o.a.s.w.WaterMarkEventGenerator watermark-event-generator-0 [ERROR] Failed while processing watermark event
      java.lang.IllegalArgumentException: Unknown stream ID: default
              at org.apache.storm.daemon.Task.getOutgoingTasks(Task.java:164) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:88) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.topology.WindowedBoltExecutor$WindowedOutputCollector.emit(WindowedBoltExecutor.java:403) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:88) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at gr.katsip.spear.GroupMeanBolt.execute(GroupMeanBolt.java:51) ~[stormjar.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.topology.WindowedBoltExecutor.boltExecute(WindowedBoltExecutor.java:370) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.topology.WindowedBoltExecutor$1.onActivation(WindowedBoltExecutor.java:363) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.windowing.WindowManager.onTrigger(WindowManager.java:156) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.windowing.WatermarkTimeTriggerPolicy.handleWaterMarkEvent(WatermarkTimeTriggerPolicy.java:73) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.windowing.WatermarkTimeTriggerPolicy.track(WatermarkTimeTriggerPolicy.java:43) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.windowing.WindowManager.track(WindowManager.java:185) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.windowing.WindowManager.add(WindowManager.java:121) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.windowing.WaterMarkEventGenerator.run(WaterMarkEventGenerator.java:88) [storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_242]
              at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_242]
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_242]
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_242]
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_242]
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_242]
              at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
      2020-03-27 15:42:42.715 o.a.s.w.WaterMarkEventGenerator Thread-14-mean-executor[3, 3] [ERROR] Got exception
      java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Unknown stream ID: default
              at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_242]
              at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_242]
              at org.apache.storm.windowing.WaterMarkEventGenerator.checkFailures(WaterMarkEventGenerator.java:115) [storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.windowing.WaterMarkEventGenerator.track(WaterMarkEventGenerator.java:79) [storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.topology.WindowedBoltExecutor.execute(WindowedBoltExecutor.java:308) [storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:234) [storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.executor.Executor.accept(Executor.java:275) [storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:131) [storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.utils.JCQueue.consume(JCQueue.java:111) [storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:171) [storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:158) [storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.utils.Utils$1.run(Utils.java:392) [storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
      Caused by: java.lang.IllegalArgumentException: Unknown stream ID: default
              at org.apache.storm.daemon.Task.getOutgoingTasks(Task.java:164) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:88) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.topology.WindowedBoltExecutor$WindowedOutputCollector.emit(WindowedBoltExecutor.java:403) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:88) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at gr.katsip.spear.GroupMeanBolt.execute(GroupMeanBolt.java:51) ~[stormjar.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.topology.WindowedBoltExecutor.boltExecute(WindowedBoltExecutor.java:370) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.topology.WindowedBoltExecutor$1.onActivation(WindowedBoltExecutor.java:363) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.windowing.WindowManager.onTrigger(WindowManager.java:156) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.windowing.WatermarkTimeTriggerPolicy.handleWaterMarkEvent(WatermarkTimeTriggerPolicy.java:73) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.windowing.WatermarkTimeTriggerPolicy.track(WatermarkTimeTriggerPolicy.java:43) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.windowing.WindowManager.track(WindowManager.java:185) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.windowing.WindowManager.add(WindowManager.java:121) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at org.apache.storm.windowing.WaterMarkEventGenerator.run(WaterMarkEventGenerator.java:88) ~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_242]
              at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_242]
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_242]
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_242]
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_242]
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_242]
              ... 1 more
      

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              nikos.katsip Nikos Katsipoulakis
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: