Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-7041

Deserialize StateBackend from JobCheckpointingSettings with user classloader

    Details

      Description

      A user ran into the problem that the SubmitJob message is not (de)serialisable if it contains custom RocksDB options (or a custom state backend): [1]

      The problem is that SubmitJob contains a JobGraph which contains JobCheckpointingSettings which contains a StateBackend. This StateBackend potentially has user code and therefore can only be deserialised with the user classloader.

      This issue is mostly identical to FLINK-6531.

      [1] https://lists.apache.org/thread.html/69bb573787258ab34c3dc56ac155052f099d75e62d805f463bde5621@%3Cuser.flink.apache.org%3E

        Issue Links

          Activity

          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Fixed for master via 64c6fa70f53425d09717e1560db88d5733601c24.
          Fixed for 1.3 via 78bbb844a0b33d4297e9fa1044401a51e219fcfa.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Fixed for master via 64c6fa70f53425d09717e1560db88d5733601c24. Fixed for 1.3 via 78bbb844a0b33d4297e9fa1044401a51e219fcfa.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4232

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4232
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/4232

          Thanks! 😃

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4232 Thanks! 😃
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4232

          Thanks for the fixups @aljoscha :-D
          The last Travis run was successful, the test name rename should not affect anything.
          Merging this ..

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4232 Thanks for the fixups @aljoscha :-D The last Travis run was successful, the test name rename should not affect anything. Merging this ..
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4232#discussion_r125034182

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java —
          @@ -223,19 +223,21 @@ public static ExecutionGraph buildGraph(
          // if specified in the application, use from there, otherwise load from configuration
          final StateBackend metadataBackend;

          • final StateBackend applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend();
            + final SerializedValue<StateBackend> applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend();
            if (applicationConfiguredBackend != null) {
          • metadataBackend = applicationConfiguredBackend;
            + try { + metadataBackend = applicationConfiguredBackend.deserializeValue(classLoader); + }

            catch (IOException | ClassNotFoundException e)

            { + throw new JobExecutionException(jobId, "Could not instantiate configured state backend.", e); + }

          log.info("Using application-defined state backend for checkpoint/savepoint metadata: {}.",
          applicationConfiguredBackend);
          — End diff –

          Good catch!

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4232#discussion_r125034182 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java — @@ -223,19 +223,21 @@ public static ExecutionGraph buildGraph( // if specified in the application, use from there, otherwise load from configuration final StateBackend metadataBackend; final StateBackend applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend(); + final SerializedValue<StateBackend> applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend(); if (applicationConfiguredBackend != null) { metadataBackend = applicationConfiguredBackend; + try { + metadataBackend = applicationConfiguredBackend.deserializeValue(classLoader); + } catch (IOException | ClassNotFoundException e) { + throw new JobExecutionException(jobId, "Could not instantiate configured state backend.", e); + } log.info("Using application-defined state backend for checkpoint/savepoint metadata: {}.", applicationConfiguredBackend); — End diff – Good catch!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4232#discussion_r125004426

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java —
          @@ -223,19 +223,21 @@ public static ExecutionGraph buildGraph(
          // if specified in the application, use from there, otherwise load from configuration
          final StateBackend metadataBackend;

          • final StateBackend applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend();
            + final SerializedValue<StateBackend> applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend();
            if (applicationConfiguredBackend != null) {
          • metadataBackend = applicationConfiguredBackend;
            + try { + metadataBackend = applicationConfiguredBackend.deserializeValue(classLoader); + }

            catch (IOException | ClassNotFoundException e)

            { + throw new JobExecutionException(jobId, "Could not instantiate configured state backend.", e); + }

          log.info("Using application-defined state backend for checkpoint/savepoint metadata: {}.",
          applicationConfiguredBackend);
          — End diff –

          Need to change the argument for this log to `metadataBackend`

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4232#discussion_r125004426 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java — @@ -223,19 +223,21 @@ public static ExecutionGraph buildGraph( // if specified in the application, use from there, otherwise load from configuration final StateBackend metadataBackend; final StateBackend applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend(); + final SerializedValue<StateBackend> applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend(); if (applicationConfiguredBackend != null) { metadataBackend = applicationConfiguredBackend; + try { + metadataBackend = applicationConfiguredBackend.deserializeValue(classLoader); + } catch (IOException | ClassNotFoundException e) { + throw new JobExecutionException(jobId, "Could not instantiate configured state backend.", e); + } log.info("Using application-defined state backend for checkpoint/savepoint metadata: {}.", applicationConfiguredBackend); — End diff – Need to change the argument for this log to `metadataBackend`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4232

          Ah scratch that, perhaps the `CheckpointSettingsSerializableTest` is sufficient.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4232 Ah scratch that, perhaps the `CheckpointSettingsSerializableTest` is sufficient.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4232

          Thanks for the quick fix @aljoscha!

          Regarding tests:
          Can we also have another test that verifies `JobGraphs` containing custom `StateBackend`s or `StateBackends` with custom classes can be submitted without exceptions?
          Looks like `org.apache.flink.runtime.jobmanager.JobSubmitTest` is a good place for that.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4232 Thanks for the quick fix @aljoscha! Regarding tests: Can we also have another test that verifies `JobGraphs` containing custom `StateBackend`s or `StateBackends` with custom classes can be submitted without exceptions? Looks like `org.apache.flink.runtime.jobmanager.JobSubmitTest` is a good place for that.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user aljoscha opened a pull request:

          https://github.com/apache/flink/pull/4232

          FLINK-7041 Deserialize StateBackend from JobCheckpointingSettings with user classloader

          R: @tzulitai

          CC: @StephanEwen I noticed that the state backend that is in the configuration is never used anywhere. I'm guessing this is for future work or is something missing in the code?

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/aljoscha/flink jira-7041-fix-state-backend-classloader

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4232.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4232


          commit 5ed5300b510f1439ceb516b44592e433dbc46931
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2017-06-30T09:19:30Z

          FLINK-7041 Deserialize StateBackend from JobCheckpointingSettings with user classloader

          This also adds a test for serializability with a class that's not in the
          default classloader.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/4232 FLINK-7041 Deserialize StateBackend from JobCheckpointingSettings with user classloader R: @tzulitai CC: @StephanEwen I noticed that the state backend that is in the configuration is never used anywhere. I'm guessing this is for future work or is something missing in the code? You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-7041-fix-state-backend-classloader Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4232.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4232 commit 5ed5300b510f1439ceb516b44592e433dbc46931 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2017-06-30T09:19:30Z FLINK-7041 Deserialize StateBackend from JobCheckpointingSettings with user classloader This also adds a test for serializability with a class that's not in the default classloader.

            People

            • Assignee:
              aljoscha Aljoscha Krettek
              Reporter:
              aljoscha Aljoscha Krettek
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development