Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20314

Empty Calc is not removed by CalcRemoveRule

    XMLWordPrintableJSON

Details

    Description

      My DDL:

      create table if not exists t_order(
      id int PRIMARY KEY comment '订单id',
      timestamps bigint comment '订单创建时间',
      orderInformationId string comment '订单信息ID',
      userId string comment '用户ID',
      categoryId int comment '商品类别',
      productId int comment '商品ID',
      price decimal(10,2) comment '单价',
      productCount int comment '购买数量',
      priceSum decimal(10,2) comment '订单总价',
      shipAddress string comment '商家地址',
      receiverAddress string comment '收货地址',
      ts AS TO_TIMESTAMP(FROM_UNIXTIME(timestamps/1000)),
      WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
      )with(
      'connector' = 'kafka',
      'format' = 'debezium-avro-confluent', 
      'debezium-avro-confluent.schema-registry.url' = 'http://hostname:8081',
      'topic' = 'ods.userAnalysis.order',
      'properties.bootstrap.servers' = 'hostname:9092',
      'properties.group.id' = 'flink-analysis',
      'scan.startup.mode' = 'latest-offset'
      )
      
      

       

      query is ok  when using the following SQLs:

      select * from t_order
      select receiverAddress from t_order
      select
      id,
      timestamps,
      orderInformationId,
      userId,
      categoryId,
      productId,
      price,
      productCount,
      priceSum,
      shipAddress
      from t_order

      but when I add the receiveraddress field to the third sql like:

      select
      id,
      timestamps,
      orderInformationId,
      userId,
      categoryId,
      productId,
      price,
      productCount,
      priceSum,
      shipAddress,
      receiverAddress
      from t_order
      

      it throws an exception:

      Exception in thread "main" org.apache.flink.table.api.TableException: This calc has no useful projection and no filter. It should be removed by CalcRemoveRule.Exception in thread "main" org.apache.flink.table.api.TableException: This calc has no useful projection and no filter. It should be removed by CalcRemoveRule. at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:166) at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:84) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:82) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:167) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1261) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:702) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1065) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:664) at com.bugboy.analysis.AnalysisCase$.main(AnalysisCase.scala:161) at com.bugboy.analysis.AnalysisCase.main(AnalysisCase.scala)

       

       
       
       

      Attachments

        Issue Links

          Activity

            People

              fsk119 Shengkai Fang
              bugboy bugboy
              Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: