Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10155

JobExecutionException when using evictor followed with map

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.6.0
    • Fix Version/s: None
    • Component/s: API / DataStream
    • Labels:
      None

      Description

      The DataStream API encounters `JobExecutionException` when using a `map` after an event time `timeWindow` with an `evictor`. 

      The exception is thrown at `out.collect` on the `WindowFunction` and is not thrown when an `evictor` isn't used, or when not using event time semantics.

       

      The exception is: 

      org.apache.flink.runtime.client.JobExecutionException: java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
      
      at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
      at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
      at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
      at com.uber.jaeger.dependencies.DependenciesProcessorTest.testMapAfterWindowing(DependenciesProcessorTest.java:101)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
      at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
      at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
      at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
      at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
      at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
      at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
      at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
      at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
      at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
      at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
      at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
      at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
      at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
      at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
      at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
      at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
      at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
      at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
      Caused by: java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
      at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
      at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
      at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
      at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
      at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
      at java.lang.Thread.run(Thread.java:748)
      Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
      at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
      at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
      at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
      at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
      at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
      at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
      at com.uber.jaeger.dependencies.DependenciesProcessorTest$2.apply(DependenciesProcessorTest.java:89)
      at com.uber.jaeger.dependencies.DependenciesProcessorTest$2.apply(DependenciesProcessorTest.java:86)
      at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
      at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
      at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.emitWindowContents(EvictingWindowOperator.java:359)
      at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.onEventTime(EvictingWindowOperator.java:271)
      at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
      at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
      at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:746)
      at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
      ... 7 more
      Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
      at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
      at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
      at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
      at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
      at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
      at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
      at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
      ... 22 more
      Caused by: com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
      Serialization trace:
      strategies (org.apache.flink.api.common.state.StateTtlConfig$CleanupStrategies)
      cleanupStrategies (org.apache.flink.api.common.state.StateTtlConfig)
      ttlConfig (org.apache.flink.api.common.state.ListStateDescriptor)
      evictingWindowStateDescriptor (org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator)
      this$0 (org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator$2)
      val$function (org.apache.flink.shaded.guava18.com.google.common.collect.Iterables$8)
      val$fromIterable (org.apache.flink.shaded.guava18.com.google.common.collect.Iterables$8)
      at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
      at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
      at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505)
      at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:224)
      at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
      ... 28 more
      Caused by: java.lang.NullPointerException
      at java.util.EnumMap$EnumMapIterator.hasNext(EnumMap.java:527)
      at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:80)
      at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
      at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
      at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
      ... 50 more

       

      And the code is:

      @Test
      public void testMapAfterWindowing() throws Exception {
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          env.setParallelism(1);
          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      
      
          DataStreamSource<String> stringStream = env.fromElements("this", "is", "some", "data", "tomfoolery");
          SingleOutputStreamOperator<String> source = stringStream.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
      
          source.keyBy(new KeySelector<String, String>() {
              @Override
              public String getKey(String value) throws Exception {
                  return value.substring(0, 1);
              }
          })
                  .timeWindow(Time.milliseconds(100))
                  .evictor(CountEvictor.of(1))
                  .apply(new WindowFunction<String, Iterable<String>, String, TimeWindow>() {
                      @Override
                      public void apply(String s, TimeWindow window, Iterable<String> input, Collector<Iterable<String>> out) throws Exception {
                          out.collect(input);
                      }
                  })
                  .map(new RichMapFunction<Iterable<String>, Iterable<String>>() {
                      // Identity map function
                      @Override
                      public Iterable<String> map(Iterable<String> value) throws Exception {
                          return value;
                      }
                  })
                  .printToErr();
      
          env.execute();
      
      }

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              pr Prithvi Raj
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated: