Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-28861

Non-deterministic UID generation might cause issues during restore

    XMLWordPrintableJSON

Details

    • Hide
      1.15.0 and 1.15.1 generated non-deterministic UIDs for operators that make it difficult/impossible to restore state or upgrade to next patch version. A new table.exec.uid.generation config option (with correct default behavior) disables setting a UID for new pipelines from non-compiled plans. Existing pipelines can set table.exec.uid.generation=ALWAYS if the 1.15.0/1 behavior was acceptable.
      Show
      1.15.0 and 1.15.1 generated non-deterministic UIDs for operators that make it difficult/impossible to restore state or upgrade to next patch version. A new table.exec.uid.generation config option (with correct default behavior) disables setting a UID for new pipelines from non-compiled plans. Existing pipelines can set table.exec.uid.generation=ALWAYS if the 1.15.0/1 behavior was acceptable.

    Description

      I want to use the savepoint mechanism to move existing jobs from one version of Flink to another, by:

      1. Stopping a job with a savepoint
      2. Creating a new job from the savepoint, on the new version.

      In Flink 1.15.1, it fails, even when going from 1.15.1 to 1.15.1. I get this error, meaning that it could not map the state from the previous job to the new one because of one operator:

      Failed to rollback to checkpoint/savepoint hdfs://hdfs-name:8020/flink-savepoints/savepoint-046708-238e921f5e78. Cannot map checkpoint/savepoint state for operator d14a399e92154660771a806b90515d4c to the new program, because the operator is not available in the new program.

      After investigation, the problematic operator corresponds to a ChangelogNormalize operator, that I do not explicitly create. It is generated because I use tableEnv.fromChangelogStream(stream, schema, ChangelogMode.upsert()) (the upsert mode is important, other modes do not fail). The table created is passed to an SQL query using the SQL API, which generates something like:

      ChangelogNormalize[8] -> Calc[9] -> TableToDataSteam -> [my_sql_transformation] -> [my_sink]

      In previous versions of Flink it seems this operator was always given the same uid so the state could match when starting from the savepoint. In Flink 1.15.1, I see that a different uid is generated every time. I could not find a reliable way to set that uid manually. The only way I found was by going backwards from the transformation:

      dataStream.getTransformation().getInputs().get(0).getInputs().get(0).getInputs().get(0).setUid("the_user_defined_id");

      Attachments

        Activity

          People

            twalthr Timo Walther
            colinsmetz Colin Smetz
            Votes:
            0 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: