Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30420

NPE thrown when using window time in Table API

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • Table SQL / Runtime
    • None

    Description

      Run the following unit test.

      import org.apache.flink.api.common.eventtime.WatermarkStrategy;
      import org.apache.flink.api.common.typeinfo.Types;
      import org.apache.flink.api.java.tuple.Tuple2;
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
      import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator;
      import org.apache.flink.table.api.DataTypes;
      import org.apache.flink.table.api.Schema;
      import org.apache.flink.table.api.Table;
      import org.apache.flink.table.api.Tumble;
      import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
      import org.apache.flink.table.functions.AggregateFunction;
      import org.apache.flink.table.functions.TableFunction;
      
      import org.junit.Before;
      import org.junit.Test;
      
      import java.sql.Timestamp;
      import java.time.Duration;
      import java.util.ArrayList;
      import java.util.List;
      
      import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
      import static org.apache.flink.table.api.Expressions.$;
      import static org.apache.flink.table.api.Expressions.call;
      import static org.apache.flink.table.api.Expressions.currentTimestamp;
      import static org.apache.flink.table.api.Expressions.lit;
      
      public class TestWindowTime {
      
          private StreamTableEnvironment tEnv;
          private StreamExecutionEnvironment env;
      
          @Before
          public void before() {
              Configuration config = new Configuration();
              env = StreamExecutionEnvironment.getExecutionEnvironment(config);
              env.setParallelism(1);
              tEnv = StreamTableEnvironment.create(env);
          }
      
          @Test
          public void testWindowTime() {
              DataStream<Integer> stream =
                      env.addSource(
                                      new DataGeneratorSource<>(
                                              SequenceGenerator.intGenerator(0, 30), 1, 30l))
                              .returns(Integer.class);
              DataStream<Tuple2<Integer, Long>> streamWithTime =
                      stream.map(x -> Tuple2.of(x, System.currentTimeMillis()))
                              .returns(Types.TUPLE(Types.INT, Types.LONG))
                              .assignTimestampsAndWatermarks(
                                      WatermarkStrategy.<Tuple2<Integer, Long>>forBoundedOutOfOrderness(
                                                      Duration.ofSeconds(2))
                                              .withTimestampAssigner(
                                                      (ctx) -> (element, recordTimestamp) -> element.f1));
              Schema schema =
                      Schema.newBuilder()
                              .column("f0", DataTypes.INT())
                              .column("f1", DataTypes.BIGINT())
                              .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
                              .watermark("rowtime", "SOURCE_WATERMARK()")
                              .build();
      
              Table table = tEnv.fromDataStream(streamWithTime, schema);
              table = table.select($("rowtime"));
      
              Table windowedTable =
                      table.window(Tumble.over("5.seconds").on("rowtime").as("w"))
                              .groupBy($("w"))
                              .select(
                                      call(UDAF.class, $("rowtime")).as("row_times"),
                                      $("w").rowtime().as("window_time"),
                                      currentTimestamp().as("current_timestamp"));
      
              windowedTable =
                      windowedTable
                              .joinLateral(call(SplitFunction.class, $("row_times")).as("rowtime"))
                              .select(
                                      $("rowtime").cast(TIMESTAMP(3)).as("rowtime"),
                                      $("window_time"),
                                      $("current_timestamp"));
              windowedTable.printSchema();
              windowedTable.execute().print();
          }
      
          public static class SplitFunction extends TableFunction<Timestamp> {
      
              public void eval(List<Timestamp> times) {
                  for (int i = 0; i < times.size(); i++) {
                      collect(times.get(i));
                  }
              }
          }
      
          public static class UDAF extends AggregateFunction<List<Timestamp>, List<Timestamp>> {
      
              public UDAF() {}
      
              @Override
              public List<Timestamp> createAccumulator() {
                  return new ArrayList<>();
              }
      
              public void accumulate(List<Timestamp> accumulator, Timestamp num) {
                  accumulator.add(num);
              }
      
              @Override
              public List<Timestamp> getValue(List<Timestamp> accumulator) {
                  return accumulator;
              }
          }
      } 

      Then the following exception occurs.

      java.lang.NullPointerException
          at org.apache.calcite.sql2rel.RelDecorrelator.getNewForOldInputRef(RelDecorrelator.java:1359)
          at org.apache.calcite.sql2rel.RelDecorrelator.access$400(RelDecorrelator.java:122)
          at org.apache.calcite.sql2rel.RelDecorrelator$DecorrelateRexShuttle.visitInputRef(RelDecorrelator.java:1638)
          at org.apache.calcite.sql2rel.RelDecorrelator$DecorrelateRexShuttle.visitInputRef(RelDecorrelator.java:1595)
          at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
          at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
          at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
          at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:33)
          at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
          at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateExpr(RelDecorrelator.java:348)
          at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:759)
          at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:723)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525)
          at org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687)
          at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:1170)
          at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:1153)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525)
          at org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687)
          at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:734)
          at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:723)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525)
          at org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687)
          at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:391)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525)
          at org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687)
          at org.apache.calcite.sql2rel.RelDecorrelator.decorrelate(RelDecorrelator.java:276)
          at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateQuery(RelDecorrelator.java:200)
          at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateQuery(RelDecorrelator.java:165)
          at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:41)
          at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
          at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
          at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
          at scala.collection.Iterator.foreach(Iterator.scala:937)
          at scala.collection.Iterator.foreach$(Iterator.scala:937)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
          at scala.collection.IterableLike.foreach(IterableLike.scala:70)
          at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
          at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
          at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
          at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
          at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
          at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
          at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
          at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
          at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
          at scala.collection.immutable.Range.foreach(Range.scala:155)
          at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
          at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
          at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
          at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
          at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
          at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
          at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
          at scala.collection.Iterator.foreach(Iterator.scala:937)
          at scala.collection.Iterator.foreach$(Iterator.scala:937)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
          at scala.collection.IterableLike.foreach(IterableLike.scala:70)
          at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
          at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
          at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
          at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
          at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
          at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
          at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:175)
          at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:82)
          at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:75)
          at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:307)
          at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:187)
          at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656)
          at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:828)
          at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1317)
          at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:605)
          at org.apache.flink.ml.clustering.TestAggWithSourceWatermark.testWindowTime(TestAggWithSourceWatermark.java:109)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
          at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
          at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
          at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
          at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
          at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
          at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
          at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
          at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
          at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
          at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
          at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
          at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
          at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
          at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
          at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
          at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
          at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
          at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
          at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
       

      Attachments

        Activity

          People

            Unassigned Unassigned
            Jiang Xin Jiang Xin
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: