Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-27218

Serializer in OperatorState has not been updated when new Serializers are NOT incompatible

    XMLWordPrintableJSON

Details

    Description

      OperatorState such as BroadcastState or PartitionableListState  can only be constructed via DefaultOperatorStateBackend. But when BroadcastState or PartitionableListState Serializer changes after we restart the job , it seems to have the following problems .

      As an example, we can see how PartitionableListState is initialized.

      First, RestoreOperation will construct a restored PartitionableListState based on the information in the snapshot.

      Then StateMetaInfo in partitionableListState will be updated  as the following code

      TypeSerializerSchemaCompatibility<S> stateCompatibility =
                      restoredPartitionableListStateMetaInfo.updatePartitionStateSerializer(newPartitionStateSerializer);
      
      partitionableListState.setStateMetaInfo(restoredPartitionableListStateMetaInfo);

      The main problem is that there is also an internalListCopySerializer in PartitionableListState that is built using the previous Serializer and it has not been updated. 

      Therefore, when we update the StateMetaInfo, the internalListCopySerializer also needs to be updated.

       

      Attachments

        Issue Links

          Activity

            People

              mayuehappy Yue Ma
              mayuehappy Yue Ma
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: