Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6353

Restoring using CheckpointedRestoring does not work from 1.2 to 1.2

    Details

      Description

      State that was checkpointed using Checkpointed (on a user function) cannot be restored using CheckpointedRestoring when the savepoint was done on Flink 1.2. The reason is an overzealous check in AbstractUdfStreamOperator that only restores from "legacy" operator state using CheckpointedRestoring when the stream is a Migration stream.

      We can remove that check but still need to make sure to read away the byte that indicates whether there is legacy state, which is written when we're restoring from a Flink 1.1 savepoint.

      Also, if we remove the check, the procedure for a user to migrate a user function away from the Checkpointed interface is this:

      1. Perform savepoint with user function still implementing Checkpointed, shutdown job
      2. Change user function to implement CheckpointedRestoring
      3. Restore from previous savepoint, user function has to somehow move the state that is restored using CheckpointedRestoring to another type of state, .e.g operator state, using the OperatorStateStore.
      4. Perform another savepoint, shutdown job
      5. Remove CheckpointedRestoring interface from user function
      6. Restore from the second savepoint
      7. Done.

      If the CheckpointedRestoring interface is not removed as prescribed in the last steps then a future restore of a new savepoint will fail because Flink will try to read legacy operator state that is not there anymore.

      The above steps also apply to Flink 1.3, when a user want's to move away from the Checkpointed interface.

        Activity

        Hide
        srichter Stefan Richter added a comment - - edited

        Just a small idea: do you think that from a users perspective we should simply allow to implement both, Checkpointed and the new CheckpointedFunction with the result that CheckpointedFunction overrides Checkpointed for snapshots, essentially making it equivalent to CheckpointedRestoring. We can log that that Checkpointed is disabled on the snapshots. Then, starting from the next savepoint, the Checkpointed interface could be dropped at any time, or kept for backwards compatibility. This way we could avoid fiddling around with too many different interfaces.

        Then the upgrade story is simply:

        1. Implement CheckpointedFunction.
        2. Drop Checkpointed at convenience, when backwards compatibility is obsolete.

        Show
        srichter Stefan Richter added a comment - - edited Just a small idea: do you think that from a users perspective we should simply allow to implement both, Checkpointed and the new CheckpointedFunction with the result that CheckpointedFunction overrides Checkpointed for snapshots, essentially making it equivalent to CheckpointedRestoring . We can log that that Checkpointed is disabled on the snapshots. Then, starting from the next savepoint, the Checkpointed interface could be dropped at any time, or kept for backwards compatibility. This way we could avoid fiddling around with too many different interfaces. Then the upgrade story is simply: 1. Implement CheckpointedFunction . 2. Drop Checkpointed at convenience, when backwards compatibility is obsolete.
        Hide
        aljoscha Aljoscha Krettek added a comment -

        So having CheckpointedFunction would mean that Checkpointed.snapshotState() is never called while Checkpointed.restoreState() is called only the first time we restore from a snapshot with legacy state. Correct? That sounds good and we could do it in addition to the complicated way, which just works without us adding anything once this fix is in.

        Show
        aljoscha Aljoscha Krettek added a comment - So having CheckpointedFunction would mean that Checkpointed.snapshotState() is never called while Checkpointed.restoreState() is called only the first time we restore from a snapshot with legacy state. Correct? That sounds good and we could do it in addition to the complicated way, which just works without us adding anything once this fix is in.
        Hide
        srichter Stefan Richter added a comment - - edited

        Yes, that is the idea. After implementing CheckpointedFunction, no more legacy state should be produced in future check/savepoints and therefore no more calls to Checkpointed::restoreState() will happen when restoring from the those new check/savepoints. The old interface can then be kept or dropped at the user's convenience. I think all we need to change is just how some instanceof checks are evaluated, iirc implementing both interfaces currently leads to a (intentional) runtime exception.

        Show
        srichter Stefan Richter added a comment - - edited Yes, that is the idea. After implementing CheckpointedFunction , no more legacy state should be produced in future check/savepoints and therefore no more calls to Checkpointed::restoreState() will happen when restoring from the those new check/savepoints. The old interface can then be kept or dropped at the user's convenience. I think all we need to change is just how some instanceof checks are evaluated, iirc implementing both interfaces currently leads to a (intentional) runtime exception.
        Hide
        StephanEwen Stephan Ewen added a comment -

        Aljoscha Krettek With the set of tests that have gone into master yesterday, is this issue solved?

        Show
        StephanEwen Stephan Ewen added a comment - Aljoscha Krettek With the set of tests that have gone into master yesterday, is this issue solved?
        Hide
        aljoscha Aljoscha Krettek added a comment -

        It does work now, yes, with the process outlined in the description. I left it open because we might want to implement Stefan Richter's idea to simplify the migration process. We can open a separate issue for that, though.

        Show
        aljoscha Aljoscha Krettek added a comment - It does work now, yes, with the process outlined in the description. I left it open because we might want to implement Stefan Richter 's idea to simplify the migration process. We can open a separate issue for that, though.

          People

          • Assignee:
            aljoscha Aljoscha Krettek
            Reporter:
            aljoscha Aljoscha Krettek
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development