Details
-
Bug
-
Status: Closed
-
Critical
-
Resolution: Fixed
-
1.12.0, 1.13.0
Description
For the following job:
from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import TableConfig, StreamTableEnvironment config = TableConfig() env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env, config) source_ddl = """ CREATE TABLE InTable ( `ID` STRING, `Timestamp` TIMESTAMP(3), `Result` ROW( `data` ROW(`value` BIGINT) ARRAY), WATERMARK FOR `Timestamp` AS `Timestamp` ) WITH ( 'connector' = 'filesystem', 'format' = 'json', 'path' = '/tmp/1.txt' ) """ sink_ddl = """ CREATE TABLE OutTable ( `ID` STRING, `value` BIGINT ) WITH ( 'connector' = 'print' ) """ t_env.execute_sql(source_ddl) t_env.execute_sql(sink_ddl) table = t_env.from_path('InTable') table \ .select( table.ID, table.Result.data.at(1).value) \ .execute_insert('OutTable') \ .wait()
It will thrown the following exception:
: scala.MatchError: ITEM($2.data, 1) (of class org.apache.calcite.rex.RexCall) at org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273) at org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:283) at org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:269) at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92) at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:112) at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:111) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$.build(NestedProjectionUtil.scala:111) at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala) at org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:155) at org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:65)
See https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array for more details
Attachments
Issue Links
- is related to
-
FLINK-21746 flink sql fields in row access error about scalarfunction
- Closed
- links to