Details
-
Bug
-
Status: Closed
-
Critical
-
Resolution: Fixed
-
1.12.0
Description
My DDL:
create table if not exists t_order( id int PRIMARY KEY comment '订单id', timestamps bigint comment '订单创建时间', orderInformationId string comment '订单信息ID', userId string comment '用户ID', categoryId int comment '商品类别', productId int comment '商品ID', price decimal(10,2) comment '单价', productCount int comment '购买数量', priceSum decimal(10,2) comment '订单总价', shipAddress string comment '商家地址', receiverAddress string comment '收货地址', ts AS TO_TIMESTAMP(FROM_UNIXTIME(timestamps/1000)), WATERMARK FOR ts AS ts - INTERVAL '3' SECOND )with( 'connector' = 'kafka', 'format' = 'debezium-avro-confluent', 'debezium-avro-confluent.schema-registry.url' = 'http://hostname:8081', 'topic' = 'ods.userAnalysis.order', 'properties.bootstrap.servers' = 'hostname:9092', 'properties.group.id' = 'flink-analysis', 'scan.startup.mode' = 'latest-offset' )
query is ok when using the following SQLs:
select * from t_order
select receiverAddress from t_order
select id, timestamps, orderInformationId, userId, categoryId, productId, price, productCount, priceSum, shipAddress from t_order
but when I add the receiveraddress field to the third sql like:
select id, timestamps, orderInformationId, userId, categoryId, productId, price, productCount, priceSum, shipAddress, receiverAddress from t_order
it throws an exception:
Exception in thread "main" org.apache.flink.table.api.TableException: This calc has no useful projection and no filter. It should be removed by CalcRemoveRule.Exception in thread "main" org.apache.flink.table.api.TableException: This calc has no useful projection and no filter. It should be removed by CalcRemoveRule. at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:166) at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:84) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:82) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:167) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1261) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:702) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1065) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:664) at com.bugboy.analysis.AnalysisCase$.main(AnalysisCase.scala:161) at com.bugboy.analysis.AnalysisCase.main(AnalysisCase.scala)
Attachments
Issue Links
- links to