Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.18.0
Description
How to Reproduce
Firstly, add a test to dump the compiled plan JSON.
@Test public void debug() { tableEnv.executeSql("create table foo (f0 int, f1 string) with ('connector' = 'datagen')"); tableEnv.executeSql("create table bar (f0 int, f1 string) with ('connector' = 'print')"); tableEnv.compilePlanSql("insert into bar select * from foo") .writeToFile(new File("/path/to/debug.json")); }
The JSON context is as follows
{ "flinkVersion" : "1.18", "nodes" : [ { "id" : 1, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { "identifier" : "`default_catalog`.`default_database`.`foo`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "f0", "dataType" : "INT" }, { "name" : "f1", "dataType" : "VARCHAR(2147483647)" } ], "watermarkSpecs" : [ ] }, "partitionKeys" : [ ], "options" : { "connector" : "datagen" } } } }, "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>", "description" : "TableSourceScan(table=[[default_catalog, default_database, foo]], fields=[f0, f1])", "inputProperties" : [ ] }, { "id" : 2, "type" : "stream-exec-sink_1", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", "table.exec.sink.type-length-enforcer" : "IGNORE", "table.exec.sink.upsert-materialize" : "AUTO" }, "dynamicTableSink" : { "table" : { "identifier" : "`default_catalog`.`default_database`.`bar`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "f0", "dataType" : "INT" }, { "name" : "f1", "dataType" : "VARCHAR(2147483647)" } ], "watermarkSpecs" : [ ] }, "partitionKeys" : [ ], "options" : { "connector" : "print" } } } }, "inputChangelogMode" : [ "INSERT" ], "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" }, "damBehavior" : "PIPELINED", "priority" : 0 } ], "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>", "description" : "Sink(table=[default_catalog.default_database.bar], fields=[f0, f1])" } ], "edges" : [ { "source" : 1, "target" : 2, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" } ] }
Then upgrade the StreamExecSink to a new version
@ExecNodeMetadata( name = "stream-exec-sink", version = 1, consumedOptions = { "table.exec.sink.not-null-enforcer", "table.exec.sink.type-length-enforcer", "table.exec.sink.upsert-materialize", "table.exec.sink.keyed-shuffle" }, producedTransformations = { CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION, CommonExecSink.PARTITIONER_TRANSFORMATION, CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION, CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION, CommonExecSink.SINK_TRANSFORMATION }, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15) @ExecNodeMetadata( name = "stream-exec-sink", version = 2, consumedOptions = { "table.exec.sink.not-null-enforcer", "table.exec.sink.type-length-enforcer", "table.exec.sink.upsert-materialize", "table.exec.sink.keyed-shuffle" }, producedTransformations = { CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION, CommonExecSink.PARTITIONER_TRANSFORMATION, CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION, CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION, CommonExecSink.SINK_TRANSFORMATION }, minPlanVersion = FlinkVersion.v1_18, minStateVersion = FlinkVersion.v1_15) public class StreamExecSink extends CommonExecSink implements StreamExecNode<Object> { }
And then load the previous plan and print it as JSON text
tableEnv.loadPlan(PlanReference.fromFile("/path/to/debug.json")).printJsonString();
The SerDe lost idempotence since the version for StreamExecSink became version 2.
{ "flinkVersion" : "1.18", "nodes" : [ { "id" : 1, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { "identifier" : "`default_catalog`.`default_database`.`foo`" } }, "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>", "description" : "TableSourceScan(table=[[default_catalog, default_database, foo]], fields=[f0, f1])", "inputProperties" : [ ] }, { "id" : 2, "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", "table.exec.sink.type-length-enforcer" : "IGNORE", "table.exec.sink.upsert-materialize" : "AUTO" }, "dynamicTableSink" : { "table" : { "identifier" : "`default_catalog`.`default_database`.`bar`" } }, "inputChangelogMode" : [ "INSERT" ], "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" }, "damBehavior" : "PIPELINED", "priority" : 0 } ], "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>", "description" : "Sink(table=[default_catalog.default_database.bar], fields=[f0, f1])" } ], "edges" : [ { "source" : 1, "target" : 2, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" } ] }
Root Cause
ExecNodeBase#getContextFromAnnotation always uses the newest ExecNode version for SerDe. As a result, although the deserialized CompilePlan object is correct, #printAsJson will create a new context with the newest version.
Suggested Fix
If the member variable `isCompiled` is true, then #getContextFromAnnotation should return the context which reads from the JSON plan instead of instantiating a new one.
Attachments
Issue Links
- Discovered while testing
-
FLINK-31791 FLIP-292: Enhance COMPILED PLAN to support operator-level state TTL configuration
- Closed
- is related to
-
FLINK-25217 FLIP-190: Support Version Upgrades for Table API & SQL Programs
- Open
- links to