Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6714

Operator state backend should set user classloader as context classloader when snapshotting

    Details

      Description

      Now that the operator state backend creates a deep copy of the state during the synchronous part of async checkpoints, it needs to set the user classloader as the thread context classloader, otherwise serializers that uses serialization for copying will use the wrong classloader when deserializing the copy.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/3987

          FLINK-6714 [runtime] Use user classloader for operator state copying on snapshots

          Now that the operator state backend creates a deep copy of the state during the synchronous part of async checkpoints, it needs to set the user classloader as the thread context classloader, otherwise serializers that uses serialization for copying will use the wrong classloader when deserializing the copy.

          A test is added to OperatorStateBackendTest to verify that the user classloader is properly used on copy.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-6714

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3987.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3987


          commit 516fd085d640d56ea2ef7be7f0b11f6ad43e92f5
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-05-25T08:54:08Z

          FLINK-6714 [runtime] Use user classloader for operator state copying on snapshots


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/3987 FLINK-6714 [runtime] Use user classloader for operator state copying on snapshots Now that the operator state backend creates a deep copy of the state during the synchronous part of async checkpoints, it needs to set the user classloader as the thread context classloader, otherwise serializers that uses serialization for copying will use the wrong classloader when deserializing the copy. A test is added to OperatorStateBackendTest to verify that the user classloader is properly used on copy. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-6714 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3987.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3987 commit 516fd085d640d56ea2ef7be7f0b11f6ad43e92f5 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-05-25T08:54:08Z FLINK-6714 [runtime] Use user classloader for operator state copying on snapshots
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3987

          Manually verified that https://issues.apache.org/jira/browse/FLINK-6515 should be properly fixed with this change.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3987 Manually verified that https://issues.apache.org/jira/browse/FLINK-6515 should be properly fixed with this change.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/3987

          +1 to merge

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3987 +1 to merge
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user EronWright commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3987#discussion_r118530090

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java —
          @@ -185,13 +185,18 @@ public void dispose() {
          new HashMap<>(registeredStates.size());

          // eagerly create deep copies of the list states in the sync phase, so that we can use them in the async writing

          • for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredStates.entrySet()) {
            -
          • PartitionableListState<?> listState = entry.getValue();
          • if (null != listState) {
          • listState = listState.deepCopy();
            + ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
            + try {
            + Thread.currentThread().setContextClassLoader(userClassloader);
              • End diff –

          (nitpick) `Thread.currentThread().setContextClassLoader(userClassloader)` should occur before the `try` block is entered. Setting the TCCL occurs in about a half-dozen places in Flink, and this is inconsistent with most of them.

          Show
          githubbot ASF GitHub Bot added a comment - Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/3987#discussion_r118530090 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java — @@ -185,13 +185,18 @@ public void dispose() { new HashMap<>(registeredStates.size()); // eagerly create deep copies of the list states in the sync phase, so that we can use them in the async writing for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredStates.entrySet()) { - PartitionableListState<?> listState = entry.getValue(); if (null != listState) { listState = listState.deepCopy(); + ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(userClassloader); End diff – (nitpick) `Thread.currentThread().setContextClassLoader(userClassloader)` should occur before the `try` block is entered. Setting the TCCL occurs in about a half-dozen places in Flink, and this is inconsistent with most of them.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3987

          Thanks for the review @rmetzger @EronWright.
          I'll address Eron's comment while merging.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3987 Thanks for the review @rmetzger @EronWright. I'll address Eron's comment while merging.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3987

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3987
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Fixed for master via b2f5dab330c8803ab72e2bbf4e94d68fc760c467.
          Fixed for 1.3 via e0454234748f838799f905e9271253dfa808a797.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Fixed for master via b2f5dab330c8803ab72e2bbf4e94d68fc760c467. Fixed for 1.3 via e0454234748f838799f905e9271253dfa808a797.

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development