Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35147

SinkMaterializer throws StateMigrationException when widening the field type in the output table

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • Table SQL / API
    • None

    Description

      When a field type in the output table is changed from int -> bigint or timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. This is unexpected as the change is backward compatible. The new type should be able to "accept" all the old values that had narrower type. 

      Note that the planner works fine and would accept such change. 

      To reproduce

      1. run the below SQL 

      CREATE TABLE ltable (
          `id` integer primary key,
          `num` int
      ) WITH (
          'connector' = 'upsert-kafka',
          'properties.bootstrap.servers' = 'kafka.test:9092',
          'key.format' = 'json',
          'value.format' = 'json',
          'topic' = 'test1'
      );
      
      CREATE TABLE rtable (
          `id` integer primary key,
          `ts` timestamp(3)
      ) WITH (
          'connector' = 'upsert-kafka',
          'properties.bootstrap.servers' = 'kafka.test:9092',
          'key.format' = 'json',
          'value.format' = 'json',
          'topic' = 'test2'
      );
      
      CREATE TABLE output (
          `id` integer primary key,
          `num` int,
          `ts` timestamp(3)
      ) WITH (
          'connector' = 'upsert-kafka',
          'properties.bootstrap.servers' = 'kafka.test:9092',
          'key.format' = 'json',
          'value.format' = 'json',
          'topic' = 'test3'
      );
      
      insert into
          `output`
      select
          ltable.id,
          num,
          ts
      from
          ltable
          join rtable on ltable.id = rtable.id
       

       

      2. Stop with a savepoint, then update output table with 

      CREATE TABLE output (
          `id` integer primary key,
          – change one of the type below would cause the issue
          `num` bigint,
          `ts` timestamp(6)
      ) WITH (
          'connector' = 'upsert-kafka',
          'properties.bootstrap.servers' = 'kafka.test:9092',
          'key.format' = 'json',
          'value.format' = 'json',
          'topic' = 'test3'
      );
      

      3. Restart the job with the savepoint created 

      Sample screenshots

      Attachments

        1. image-2024-04-17-14-15-21-647.png
          482 kB
          Sharon Xie
        2. image-2024-04-17-14-15-35-297.png
          79 kB
          Sharon Xie

        Activity

          People

            Unassigned Unassigned
            sharonxr55 Sharon Xie
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: