Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-26549

INSERT INTO with VALUES leads to wrong type inference with nested types

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      While working on casting, I've found out we have an interesting bug in the insert values type inference. This comes from the KafkaTableITCase#testKafkaSourceSinkWithMetadata (look at this version in particular https://github.com/apache/flink/blob/567440115bcacb5aceaf3304e486281c7da8c14f/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java).

      The test scenario is an INSERT INTO VALUES query which is also pushing some metadata to a Kafka table, in particular is writing the headers metadata.

      The table is declared like that:

       CREATE TABLE kafka (
        `physical_1` STRING,
        `physical_2` INT,
        `timestamp-type` STRING METADATA VIRTUAL,
        `timestamp` TIMESTAMP(3) METADATA,
        `leader-epoch` INT METADATA VIRTUAL,
        `headers` MAP<STRING, BYTES> METADATA,
        `partition` INT METADATA VIRTUAL,
        `topic` STRING METADATA VIRTUAL,
        `physical_3` BOOLEAN
      ) WITH (
         'connector' = 'kafka',
         [...]
      )
      

      The insert into query looks like:

      INSERT INTO kafka VALUES
      ('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', x'C0FFEE', 'k2', x'BABE'], TRUE),
      ('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', CAST(NULL AS MAP<STRING, BYTES>), FALSE),
      ('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'10', 'k2', X'20'], TRUE)
      

      Note that in the first row, the byte literal is of length 3, while in the last row the byte literal is of length 1.

      The generated plan of this INSERT INTO is:

      == Abstract Syntax Tree ==
      LogicalSink(table=[default_catalog.default_database.kafka], fields=[physical_1, physical_2, physical_3, headers, timestamp])
      +- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], headers=[CAST($3):(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", VARBINARY(2147483647)) MAP], timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
         +- LogicalUnion(all=[true])
            :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
            :  +- LogicalValues(tuples=[[{ 0 }]])
            :- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", VARBINARY(2147483647)) MAP], EXPR$4=[false])
            :  +- LogicalValues(tuples=[[{ 0 }]])
            +- LogicalProject(EXPR$0=[_UTF-16LE'data 3'], EXPR$1=[3], EXPR$2=[2020-03-10 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', X'10':BINARY(1), _UTF-16LE'k2', X'20':BINARY(1))], EXPR$4=[true])
               +- LogicalValues(tuples=[[{ 0 }]])
      
      == Optimized Physical Plan ==
      Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, physical_2, physical_3, headers, timestamp])
      +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, timestamp])
         :- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS physical_3, CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT NULL, BINARY(1) NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
         :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
         :- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
         :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
         +- Calc(select=[_UTF-16LE'data 3' AS physical_1, 3 AS physical_2, true AS physical_3, CAST(MAP(_UTF-16LE'k1', X'10':BINARY(1), _UTF-16LE'k2', X'20':BINARY(1)) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-10 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
            +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
      
      == Optimized Execution Plan ==
      Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, physical_2, physical_3, headers, timestamp])
      +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, timestamp])
         :- Calc(select=['data 1' AS physical_1, 1 AS physical_2, true AS physical_3, CAST(CAST(MAP('k1', X'c0ffee', 'k2', X'babe') AS (CHAR(2), BINARY(1)) MAP) AS (VARCHAR(2147483647), VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
         :  +- Values(tuples=[[{ 0 }]])(reuse_id=[1])
         :- Calc(select=['data 2' AS physical_1, 2 AS physical_2, false AS physical_3, null:(VARCHAR(2147483647), VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
         :  +- Reused(reference_id=[1])
         +- Calc(select=['data 3' AS physical_1, 3 AS physical_2, true AS physical_3, CAST(MAP('k1', X'10', 'k2', X'20') AS (VARCHAR(2147483647), VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-10 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
            +- Reused(reference_id=[1])
      

      As you see, in the Abstract Syntax Tree section a casting for the headers is injected (although unnecessary, as it should be an identity cast), but then in Optimized Physical Plan another casting is injected:

      CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT NULL, BINARY(1) NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", VARBINARY(2147483647)) MAP) AS headers
      

      Which makes no sense, as it's casting the values of the map first to BINARY(1) and then to BYTES, causing to trim the last 2 bytes. Removing the last row to insert makes the VALUES type inference work properly:

      == Abstract Syntax Tree ==
      LogicalSink(table=[default_catalog.default_database.kafka], fields=[physical_1, physical_2, physical_3, headers, timestamp])
      +- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], headers=[$3], timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
         +- LogicalUnion(all=[true])
            :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
            :  +- LogicalValues(tuples=[[{ 0 }]])
            +- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", VARBINARY(2147483647)) MAP], EXPR$4=[false])
               +- LogicalValues(tuples=[[{ 0 }]])
      
      == Optimized Physical Plan ==
      Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, physical_2, physical_3, headers, timestamp])
      +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, timestamp])
         :- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS physical_3, CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3)) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
         :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
         +- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
            +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
      
      == Optimized Execution Plan ==
      Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, physical_2, physical_3, headers, timestamp])
      +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, timestamp])
         :- Calc(select=['data 1' AS physical_1, 1 AS physical_2, true AS physical_3, CAST(MAP('k1', X'c0ffee', 'k2', X'babe') AS (VARCHAR(2147483647), VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
         :  +- Values(tuples=[[{ 0 }]])(reuse_id=[1])
         +- Calc(select=['data 2' AS physical_1, 2 AS physical_2, false AS physical_3, null:(VARCHAR(2147483647), VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
            +- Reused(reference_id=[1])
      

      Attachments

        1. Least_restrictive_issue.patch
          3 kB
          Francesco Guardiani
        2. InsertIntoValuesTest.patch
          3 kB
          Francesco Guardiani
        3. FlinkTypeFactoryTest.patch
          14 kB
          Francesco Guardiani

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Sergey Nuyanzin Sergey Nuyanzin
            slinkydeveloper Francesco Guardiani
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment