Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-31884

Upgrade ExecNode to new version causes the old serialized plan failed to pass Json SerDe round trip

    XMLWordPrintableJSON

Details

    Description

      How to Reproduce

      Firstly, add a test to dump the compiled plan JSON.

      @Test
      public void debug() {
          tableEnv.executeSql("create table foo (f0 int, f1 string) with ('connector' = 'datagen')");
          tableEnv.executeSql("create table bar (f0 int, f1 string) with ('connector' = 'print')");
          tableEnv.compilePlanSql("insert into bar select * from foo")
                  .writeToFile(new File("/path/to/debug.json"));
      }
      

      The JSON context is as follows

      {
        "flinkVersion" : "1.18",
        "nodes" : [ {
          "id" : 1,
          "type" : "stream-exec-table-source-scan_1",
          "scanTableSource" : {
            "table" : {
              "identifier" : "`default_catalog`.`default_database`.`foo`",
              "resolvedTable" : {
                "schema" : {
                  "columns" : [ {
                    "name" : "f0",
                    "dataType" : "INT"
                  }, {
                    "name" : "f1",
                    "dataType" : "VARCHAR(2147483647)"
                  } ],
                  "watermarkSpecs" : [ ]
                },
                "partitionKeys" : [ ],
                "options" : {
                  "connector" : "datagen"
                }
              }
            }
          },
          "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
          "description" : "TableSourceScan(table=[[default_catalog, default_database, foo]], fields=[f0, f1])",
          "inputProperties" : [ ]
        }, {
          "id" : 2,
          "type" : "stream-exec-sink_1",
          "configuration" : {
            "table.exec.sink.keyed-shuffle" : "AUTO",
            "table.exec.sink.not-null-enforcer" : "ERROR",
            "table.exec.sink.type-length-enforcer" : "IGNORE",
            "table.exec.sink.upsert-materialize" : "AUTO"
          },
          "dynamicTableSink" : {
            "table" : {
              "identifier" : "`default_catalog`.`default_database`.`bar`",
              "resolvedTable" : {
                "schema" : {
                  "columns" : [ {
                    "name" : "f0",
                    "dataType" : "INT"
                  }, {
                    "name" : "f1",
                    "dataType" : "VARCHAR(2147483647)"
                  } ],
                  "watermarkSpecs" : [ ]
                },
                "partitionKeys" : [ ],
                "options" : {
                  "connector" : "print"
                }
              }
            }
          },
          "inputChangelogMode" : [ "INSERT" ],
          "inputProperties" : [ {
            "requiredDistribution" : {
              "type" : "UNKNOWN"
            },
            "damBehavior" : "PIPELINED",
            "priority" : 0
          } ],
          "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
          "description" : "Sink(table=[default_catalog.default_database.bar], fields=[f0, f1])"
        } ],
        "edges" : [ {
          "source" : 1,
          "target" : 2,
          "shuffle" : {
            "type" : "FORWARD"
          },
          "shuffleMode" : "PIPELINED"
        } ]
      }
      

      Then upgrade the StreamExecSink to a new version

      @ExecNodeMetadata(
              name = "stream-exec-sink",
              version = 1,
              consumedOptions = {
                  "table.exec.sink.not-null-enforcer",
                  "table.exec.sink.type-length-enforcer",
                  "table.exec.sink.upsert-materialize",
                  "table.exec.sink.keyed-shuffle"
              },
              producedTransformations = {
                  CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
                  CommonExecSink.PARTITIONER_TRANSFORMATION,
                  CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
                  CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
                  CommonExecSink.SINK_TRANSFORMATION
              },
              minPlanVersion = FlinkVersion.v1_15,
              minStateVersion = FlinkVersion.v1_15)
      @ExecNodeMetadata(
              name = "stream-exec-sink",
              version = 2,
              consumedOptions = {
                  "table.exec.sink.not-null-enforcer",
                  "table.exec.sink.type-length-enforcer",
                  "table.exec.sink.upsert-materialize",
                  "table.exec.sink.keyed-shuffle"
              },
              producedTransformations = {
                  CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
                  CommonExecSink.PARTITIONER_TRANSFORMATION,
                  CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
                  CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
                  CommonExecSink.SINK_TRANSFORMATION
              },
              minPlanVersion = FlinkVersion.v1_18,
              minStateVersion = FlinkVersion.v1_15)
      public class StreamExecSink extends CommonExecSink implements StreamExecNode<Object> {
      }
      

      And then load the previous plan and print it as JSON text

      tableEnv.loadPlan(PlanReference.fromFile("/path/to/debug.json")).printJsonString();
      

      The SerDe lost idempotence since the version for StreamExecSink became version 2.

      {
        "flinkVersion" : "1.18",
        "nodes" : [ {
          "id" : 1,
          "type" : "stream-exec-table-source-scan_1",
          "scanTableSource" : {
            "table" : {
              "identifier" : "`default_catalog`.`default_database`.`foo`"
            }
          },
          "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
          "description" : "TableSourceScan(table=[[default_catalog, default_database, foo]], fields=[f0, f1])",
          "inputProperties" : [ ]
        }, {
          "id" : 2,
          "type" : "stream-exec-sink_2",
          "configuration" : {
            "table.exec.sink.keyed-shuffle" : "AUTO",
            "table.exec.sink.not-null-enforcer" : "ERROR",
            "table.exec.sink.type-length-enforcer" : "IGNORE",
            "table.exec.sink.upsert-materialize" : "AUTO"
          },
          "dynamicTableSink" : {
            "table" : {
              "identifier" : "`default_catalog`.`default_database`.`bar`"
            }
          },
          "inputChangelogMode" : [ "INSERT" ],
          "inputProperties" : [ {
            "requiredDistribution" : {
              "type" : "UNKNOWN"
            },
            "damBehavior" : "PIPELINED",
            "priority" : 0
          } ],
          "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
          "description" : "Sink(table=[default_catalog.default_database.bar], fields=[f0, f1])"
        } ],
        "edges" : [ {
          "source" : 1,
          "target" : 2,
          "shuffle" : {
            "type" : "FORWARD"
          },
          "shuffleMode" : "PIPELINED"
        } ]
      }
      

      Root Cause

      ExecNodeBase#getContextFromAnnotation always uses the newest ExecNode version for SerDe. As a result, although the deserialized CompilePlan object is correct, #printAsJson will create a new context with the newest version.
       

      Suggested Fix

      If the member variable `isCompiled` is true, then #getContextFromAnnotation should return the context which reads from the JSON plan instead of instantiating a new one.
       

      Attachments

        Issue Links

          Activity

            People

              qingyue Jane Chan
              qingyue Jane Chan
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: