Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22954

Don't support consuming update and delete changes when use table function that does not contain table field

    XMLWordPrintableJSON

Details

    Description

      Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.kafkaTableSink' doesn't support consuming update and delete changes which is produced by node Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.kafkaTableSink' doesn't support consuming update and delete changes which is produced by node Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:382) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:265) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:279) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:125) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:50) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:39) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287) at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:100) at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42) at org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630) at org.apache.flink.table.api.internal.StatementSetImpl.explain(StatementSetImpl.java:92)

       

      UDF code:

      @FunctionHint(output = @DataTypeHint("ROW<word INT>"))
      public class GenerateSeriesUdf extends TableFunction<Row> {
      
          public void eval(int from, int to) {
              for (int i = from; i < to; i++) {
                  Row row = new Row(1);
                  row.setField(0, i);
                  collect(row);
              }
          }
      
          @Override
          public TypeInformation<Row> getResultType() {
              return Types.ROW(Types.INT());
          }
      }
      
      

       

      `JOIN` is ok,  `LEFT JOIN` has error. 

       

       

      CREATE TABLE kafkaTableSource (
          name string,
          age int,
          sex string,
          address string,
          pt as PROCTIME()
      ) WITH (
          'connector' = 'kafka',
          'topic' = 'hehuiyuan1',
          'scan.startup.mode' = 'latest-offset',
          'properties.bootstrap.servers' = 'localhost:9092',
          'properties.client.id' = 'test-consumer-group',
          'properties.group.id' = 'test-consumer-group',
          'format' = 'json'
      );
      
      CREATE TABLE kafkaTableSink (
          name string,
          sname string,
          sno string,
          sclass string,
          address string
      ) WITH (
          'connector' = 'kafka',
          'topic' = 'hehuiyuan2',
          'properties.bootstrap.servers' = 'localhost:9092',
          'format' = 'json'
      );
      INSERT INTO kafkaTableSink
      SELECT name, name, name, name, word
      FROM kafkaTableSource
      LEFT JOIN LATERAL TABLE(GENERATE_SERIES(1,5)) AS T(word) ON TRUE;
      

       

       

      // UDF is constant , two inut
      optimize result:
      Sink(table=[default_catalog.default_database.kafkaTableSink], fields=[name, name0, name1, name2, word])
      +- Calc(select=[name, name AS name0, name AS name1, name AS name2, word])
         +- Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
            :- Exchange(distribution=[single])
            :  +- Calc(select=[name])
            :     +- TableSourceScan(table=[[default_catalog, default_database, kafkaTableSource]], fields=[name, age, sex, address])
            +- Exchange(distribution=[single])
               +- Correlate(invocation=[GENERATE_SERIES(1, 5)], correlate=[table(GENERATE_SERIES(1,5))], select=[word], rowType=[RecordType:peek_no_expand(INTEGER word)], joinType=[INNER])
                  +- Values(tuples=[[{  }]])
      
      // UDF that use table field , one inut
      optimize result:
       Sink(table=[default_catalog.default_database.kafkaTableSink], fields=[name, name0, name1, name2, province])
      +- Calc(select=[name, name AS name0, name AS name1, name AS name2, word AS province])
         +- Correlate(invocation=[JSON_TUPLE($cor0.address, _UTF-16LE'province')], correlate=[table(JSON_TUPLE($cor0.address,_UTF-16LE'province'))], select=[name,age,sex,address,pt,word], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) sex, VARCHAR(2147483647) address, TIME ATTRIBUTE(PROCTIME) pt, VARCHAR(2147483647) word)], joinType=[LEFT])
            +- Calc(select=[name, age, sex, address, PROCTIME() AS pt])
               +- TableSourceScan(table=[[default_catalog, default_database, kafkaTableSource]], fields=[name, age, sex, address])
      

       

      Attachments

        Issue Links

          Activity

            People

              wenlong.lwl Wenlong Lyu
              hehuiyuan hehuiyuan
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: