Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-31830

Coalesce on nested fields with different nullabilities will get wrong plan

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.14.6
    • None
    • Table SQL / Planner
    • None

    Description

      A test case similar to FLINK-31829, only changes the nullable field `a.np` to not null, will get a wrong plan in 1.14.x (reported from the community user):

        @Test
        def testCoalesceOnNestedColumns(): Unit = {
          val tEnv = util.tableEnv
          val tableDescriptor = TableDescriptor.forConnector("datagen")
              .schema(Schema.newBuilder
                  .column("id", DataTypes.INT.notNull)
                  .column("a", DataTypes.ROW(DataTypes.FIELD("np", DataTypes.INT.notNull())).nullable)
                  .build)
              .build
          tEnv.createTemporaryTable("t1", tableDescriptor)
          tEnv.createTemporaryTable("t2", tableDescriptor)
          val res = tEnv.executeSql("EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np) c1, IFNULL(a.a.np, b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a is null or a.a.np is null")
          res.print()
      }  
      
      == Abstract Syntax Tree ==
      LogicalProject(id=[$0], c1=[CAST($1.np):INTEGER], c2=[IFNULL($1.np, $3.np)])
      +- LogicalFilter(condition=[OR(IS NULL($1), IS NULL(CAST($1.np):INTEGER))])
         +- LogicalJoin(condition=[=($0, $2)], joinType=[left])
            :- LogicalTableScan(table=[[default_catalog, default_database, t1]])
            +- LogicalTableScan(table=[[default_catalog, default_database, t2]])
      

      the top project in the ast is wrong: `LogicalProject(id=[$0], c1=[CAST($1.np):INTEGER], c2=[IFNULL($1.np, $3.np)])`, the `c1=[CAST($1.np):INTEGER]` relate to `COALESCE(a.a.np, b.a.np) c1` is incorrect,
      but this works fine when using sql ddl to create tables

        @Test
        def testCoalesceOnNestedColumns2(): Unit = {
          val tEnv = util.tableEnv
          tEnv.executeSql(
            s"""
               |create temporary table t1 (
               |  id int not null,
               |  a row<np int not null>
               |) with (
               | 'connector' = 'datagen'
               |)
               |""".stripMargin)
          tEnv.executeSql(
            s"""
               |create temporary table t2 (
               |  id int not null,
               |  a row<np int not null>
               |) with (
               | 'connector' = 'datagen'
               |)
               |""".stripMargin)
          val res = tEnv.executeSql(
            "EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np) c1, IFNULL(a.a.np, b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a is null or a.a.np is null")
          res.print()
        }
      

      from 1.15, the coalesce will be a new builtin function, and the ast looks correct in version 1.15+, while before 1.15 it was rewritten as `case when`

      Attachments

        1. image-2023-06-09-15-21-13-720.png
          998 kB
          Jane Chan
        2. image-2023-06-09-15-06-01-322.png
          604 kB
          Jane Chan

        Issue Links

          Activity

            People

              qingyue Jane Chan
              lincoln.86xy lincoln lee
              Votes:
              1 Vote for this issue
              Watchers:
              12 Start watching this issue

              Dates

                Created:
                Updated: