Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-27218

Serializer in OperatorState has not been updated when new Serializers are NOT incompatible

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      OperatorState such as BroadcastState or PartitionableListState  can only be constructed via DefaultOperatorStateBackend. But when BroadcastState or PartitionableListState Serializer changes after we restart the job , it seems to have the following problems .

      As an example, we can see how PartitionableListState is initialized.

      First, RestoreOperation will construct a restored PartitionableListState based on the information in the snapshot.

      Then StateMetaInfo in partitionableListState will be updated  as the following code

      TypeSerializerSchemaCompatibility<S> stateCompatibility =
                      restoredPartitionableListStateMetaInfo.updatePartitionStateSerializer(newPartitionStateSerializer);
      
      partitionableListState.setStateMetaInfo(restoredPartitionableListStateMetaInfo);

      The main problem is that there is also an internalListCopySerializer in PartitionableListState that is built using the previous Serializer and it has not been updated. 

      Therefore, when we update the StateMetaInfo, the internalListCopySerializer also needs to be updated.

       

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            mayuehappy Yue Ma
            mayuehappy Yue Ma
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment