Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22082

Nested projection push down doesn't work for data such as row(array(row))

    XMLWordPrintableJSON

Details

    Description

      For the following job:

      from pyflink.datastream import StreamExecutionEnvironment
      from pyflink.table import TableConfig, StreamTableEnvironment
      
      config = TableConfig()
      env = StreamExecutionEnvironment.get_execution_environment()
      t_env = StreamTableEnvironment.create(env, config)
      
      source_ddl = """
          CREATE TABLE InTable (
              `ID` STRING,
              `Timestamp` TIMESTAMP(3),
              `Result` ROW(
                  `data` ROW(`value` BIGINT) ARRAY),
              WATERMARK FOR `Timestamp` AS `Timestamp`
          ) WITH (
              'connector' = 'filesystem',
              'format' = 'json',
              'path' = '/tmp/1.txt'
          )
      """
      
      sink_ddl = """
          CREATE TABLE OutTable (
              `ID` STRING,
              `value` BIGINT
          ) WITH (
              'connector' = 'print'
          )
      """
      
      t_env.execute_sql(source_ddl)
      t_env.execute_sql(sink_ddl)
      
      table = t_env.from_path('InTable')
      table \
          .select(
              table.ID,
              table.Result.data.at(1).value) \
          .execute_insert('OutTable') \
          .wait()
      

      It will thrown the following exception:

      : scala.MatchError: ITEM($2.data, 1) (of class org.apache.calcite.rex.RexCall)
      	at org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273)
      	at org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:283)
      	at org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:269)
      	at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92)
      	at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:112)
      	at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:111)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
      	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
      	at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$.build(NestedProjectionUtil.scala:111)
      	at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala)
      	at org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:155)
      	at org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:65)
      

      See https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array for more details

      Attachments

        Issue Links

          Activity

            People

              fsk119 Shengkai Fang
              dian.fu Dian Fu
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: