Details

      Description

      Currently, users are locked in with the serializer implementation used to write their state.
      This is suboptimal, as generally for users, it could easily be possible that they wish to change their serialization formats / state schemas and types in the future.

      This is an umbrella JIRA for the required tasks to make this possible.

      Here's an overview description of what to expect for the overall outcome of this JIRA (the specific details are outlined in their respective subtasks):

      Ideally, the main user-facing change this would result in is that users implementing their custom TypeSerializer s will also need to implement hook methods that identify whether or not there is a change to the serialized format or even a change to the serialized data type. It would be the user's responsibility that the deserialize method can bridge the change between the old / new formats.

      For Flink's built-in serializers that are automatically built using the user's configuration (most notably the more complex KryoSerializer and GenericArraySerializer), Flink should be able to automatically "reconfigure" them using the new configuration, so that the reconfigured versions can be used to de- / serialize previous state. This would require knowledge of the previous configuration of the serializer, therefore "serializer configuration metadata" will be added to savepoints.

      Note that for the first version of this, although additional infrastructure (e.g. serializer reconfigure hooks, serializer configuration metadata in savepoints) will be added to potentially allow Kryo version upgrade, this JIRA will not cover this. Kryo has breaking binary formats across major versions, and will most likely need some further changes. Therefore, for the KryoSerializer, "upgrading" it simply means changes in the registration of specific / default serializers, at least for now.

      Finally, we would need to add a "convertState" phase to the task lifecycle, that takes place after the "open" phase and before checkpointing starts / the task starts running. It can only happen after "open", because only then can we be certain if any reconfiguration of state serialization has occurred, and state needs to be converted. Ideally, the code for the "convertState" is designed so that it can be easily exposed as an offline tool in the future.

      For this JIRA, we should simply assume that after open(), we have all the required information and serializers are appropriately reconfigured. Stefan Richter is currently planning to deprecate RuntimeContext state registration methods in favor of a new interface that enforces eager state registration, so that we may have all the info after open()

        Activity

        Hide
        xiaogang.shi Xiaogang Shi added a comment -

        The idea that allowing the upgrades to state serializers is excellent. But I have some concerns about the "convertState" phase. Currently, Flink has no knowledge of the serializers to use before users access the states (via the methods provided in RuntimeContext). That means, we can only convert the states when users are about to access them. The conversion may be very costly and the processing of data streams will be paused for quite a long time.

        Actually, i am very interested at the offline tool provided in the future. Now many efforts are made in Flink runtime to allow the restoring from old savepoints. They make the code very complicated and hard to follow. I prefer to move them from the main program to the offline tool.

        I think the offline tool also eases the burdens of users to implement {{TypeSerializer}}s that allow the deserialization of the data in different serialization formats. They only need to provide the new serializers to access the states stored in the savepoints.

        Show
        xiaogang.shi Xiaogang Shi added a comment - The idea that allowing the upgrades to state serializers is excellent. But I have some concerns about the "convertState" phase. Currently, Flink has no knowledge of the serializers to use before users access the states (via the methods provided in RuntimeContext ). That means, we can only convert the states when users are about to access them. The conversion may be very costly and the processing of data streams will be paused for quite a long time. Actually, i am very interested at the offline tool provided in the future. Now many efforts are made in Flink runtime to allow the restoring from old savepoints. They make the code very complicated and hard to follow. I prefer to move them from the main program to the offline tool. I think the offline tool also eases the burdens of users to implement {{TypeSerializer}}s that allow the deserialization of the data in different serialization formats. They only need to provide the new serializers to access the states stored in the savepoints.
        Hide
        tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

        Hi Xiaogang Shi!

        You're absolutely correct. I had an offline discussion with Stefan Richter about the timing of state registrations, and indeed, current state registration through RuntimeContext poses problems for the state conversion. Simply put, Stefan is currently planning to deprecate RuntimeContext state registration methods in favor of a new interface that enforces eager state registration, so that we may have all the info after open(). For this JIRA, we should simply assume that after open(), we have all the required information and serializers are appropriately reconfigured. With this assumption, state conversion will only happen once after open(), and before any data processing starts. Perhaps I should add that assumption to the JIRA description.

        As for the old interface, we might not support the possibility to upgrade serializers for users that use the old interface.

        Regarding the offline tool:
        Originally I was thinking about it just for the use of state migration in the presence of serializer upgrades, but your suggestion to include cross-version state migration also seems like a good idea! We can keep that in mind when thinking about how to design the "convertState" phase.

        Show
        tzulitai Tzu-Li (Gordon) Tai added a comment - - edited Hi Xiaogang Shi ! You're absolutely correct. I had an offline discussion with Stefan Richter about the timing of state registrations, and indeed, current state registration through RuntimeContext poses problems for the state conversion. Simply put, Stefan is currently planning to deprecate RuntimeContext state registration methods in favor of a new interface that enforces eager state registration, so that we may have all the info after open() . For this JIRA, we should simply assume that after open() , we have all the required information and serializers are appropriately reconfigured. With this assumption, state conversion will only happen once after open() , and before any data processing starts. Perhaps I should add that assumption to the JIRA description. As for the old interface, we might not support the possibility to upgrade serializers for users that use the old interface. Regarding the offline tool: Originally I was thinking about it just for the use of state migration in the presence of serializer upgrades, but your suggestion to include cross-version state migration also seems like a good idea! We can keep that in mind when thinking about how to design the "convertState" phase.
        Hide
        xiaogang.shi Xiaogang Shi added a comment -

        Tzu-Li (Gordon) Tai Thanks a lot for your quick response. The changes to the interfaces in RuntimeContext sound great! They do help in the conversion of savepoints. Looking forwards to them.

        Show
        xiaogang.shi Xiaogang Shi added a comment - Tzu-Li (Gordon) Tai Thanks a lot for your quick response. The changes to the interfaces in RuntimeContext sound great! They do help in the conversion of savepoints. Looking forwards to them.
        Show
        tzulitai Tzu-Li (Gordon) Tai added a comment - Merged for 1.3.0 with https://git-wip-us.apache.org/repos/asf/flink/commit/63c04a5
        Hide
        yuzhihong@gmail.com Ted Yu added a comment -

        WritableSerializerConfigSnapshot#getVersion() returns an int.
        Should we distinguish between major vs. minor versions ?

        Show
        yuzhihong@gmail.com Ted Yu added a comment - WritableSerializerConfigSnapshot#getVersion() returns an int. Should we distinguish between major vs. minor versions ?
        Hide
        yuzhihong@gmail.com Ted Yu added a comment -

        For EnumSerializerConfigSnapshot :

        +   public boolean equals(Object obj) {
        +     return super.equals(obj)
        +         && Arrays.equals(
        +           enumConstants,
        +           ((EnumSerializerConfigSnapshot) obj).getEnumConstants());
        

        Should there be a check that obj is instance of EnumSerializerConfigSnapshot before casting ?

        Show
        yuzhihong@gmail.com Ted Yu added a comment - For EnumSerializerConfigSnapshot : + public boolean equals( Object obj) { + return super .equals(obj) + && Arrays.equals( + enumConstants, + ((EnumSerializerConfigSnapshot) obj).getEnumConstants()); Should there be a check that obj is instance of EnumSerializerConfigSnapshot before casting ?
        Hide
        tzulitai Tzu-Li (Gordon) Tai added a comment -

        Ted Yu the version of config snapshots is just an indicator. The intention was that if users had to change anything in the config snapshot (written info, serialization format etc.) they simply uptick the version value. Does that make sense?

        Show
        tzulitai Tzu-Li (Gordon) Tai added a comment - Ted Yu the version of config snapshots is just an indicator. The intention was that if users had to change anything in the config snapshot (written info, serialization format etc.) they simply uptick the version value. Does that make sense?
        Hide
        tzulitai Tzu-Li (Gordon) Tai added a comment -

        Ted Yu regarding the equals: the base `super.equals(obj)` already performs the class check.

        Show
        tzulitai Tzu-Li (Gordon) Tai added a comment - Ted Yu regarding the equals: the base `super.equals(obj)` already performs the class check.
        Hide
        yuzhihong@gmail.com Ted Yu added a comment -

        Alright.

        Thanks, Gordon.

        Show
        yuzhihong@gmail.com Ted Yu added a comment - Alright. Thanks, Gordon.

          People

          • Assignee:
            tzulitai Tzu-Li (Gordon) Tai
            Reporter:
            tzulitai Tzu-Li (Gordon) Tai
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development