Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
1.6.2, 1.7.0
-
Migration from Flink 1.5.3 to Flink 1.7.0
Description
When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast state throws the following error:
org.apache.flink.util.StateMigrationException: The new key serializer for broadcast state must not be incompatible. at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) at org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:745)
The broadcast is using a MapState with StringSerializer as key serializer and a custom JsonSerializer as value serializer.
There was no changes in the TypeSerializers used, only upgrade of version.
With some debugging I see that at the moment of the validation of the compatibility of states in the DefaultOperatorStateBackend class, the "registeredBroadcastStates" containing the data about the 'old' state, contains wrong association of the key and value serializer. This is, JsonSerializer appears as key serializer and StringSerializer appears as value serializer. (when it should be the contrary)
After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" class is the responsible of this swap here:
https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165
Attachments
Issue Links
- links to