Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5892

Recover job state at the granularity of operator

    Details

      Description

      JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the granularity of task.
      This leads to the result that the operator of the job may not recover the state from a save point even if the save point has the state of operator.

      https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.

        Issue Links

          Activity

          Hide
          Zentol Chesnay Schepler added a comment -

          Guowei Ma What's your progress on this issue?

          Show
          Zentol Chesnay Schepler added a comment - Guowei Ma What's your progress on this issue?
          Hide
          maguowei Guowei Ma added a comment -

          hi Chesnay Schepler
          I am discussing the proposal with Stefan Richter.
          I think I will do it after the discussion finish.

          Show
          maguowei Guowei Ma added a comment - hi Chesnay Schepler I am discussing the proposal with Stefan Richter . I think I will do it after the discussion finish.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zentol opened a pull request:

          https://github.com/apache/flink/pull/3770

          FLINK-5892 Restore state on the operator level

            1. General
              This PR is a collaboration between @guoweiM and myself, enabling Flink to restore state on the operator level. This means that the topology of a job may change in regards to chains when restoring from a 1.3 savepoint, allowing the arbitrary addition, removal or modification of chains.

          The cornerstone for this is a semantic change for savepoints, no structural changes have been made to the `SavepointV0/1/2` classes or their serialized format:

          In 1.2 a savepoint contains the states of tasks. If a task consists of multiple operators then the stored TaskState internally contains a list of states, one entry for each operator.

          In 1.3 a savepoint contains the states of operators only; the notion of tasks is eliminated. If a task consists of multiple operators we store one TaskState for each operator instead. Internally they each contain a list of states with a length of 1.

            1. Implementation

          In order for this to work a number of changes had to be made.

          First and foremost we required a new `StateAssignmentOperation` that was aware of operators.
          (74881a2, 8be9c58, 4fa8bbd)

          Since the SAO uses the `ExecutionGraph` classes to map the restored state it was necessary to forward the IDs of all contained operators from the `StreamingJobGraphGenerator` to the `ExecutionJobVertex`.
          (73427c3)

          The `PendingCheckpoint` class had to be adjusted to conform to the new semantics; received `SubtaskStates`, containing the state of a task, are broken down into SubtaskStates for the individual operators.
          (f7b8ef9)

            1. Tests

          The majority of this PR are new tests (60% or so).

          A number of tests were added under flink-tests that test the migration path from 1.2 to 1.3.
          (d1efdb1)

          These tests first restore a job from a 1.2 savepoint, without changes to the topology, verify that the state was restored correctly and finally create a new savepoint. They then restore from this migrated 1.3 savepoint, with changes to the topology for varying scenarios, and verify the correct restoration of state again.

          A new test was also added to the `CheckpointCoordinatorTest` which tests the support for topology changes without executing a job.
          (8b5430f9)

          A number of existing tests had to be tweaked to run with the new changes, but these changes all boil down to extending existing mocks by a method or two.
          (b5430f9)

            1. Other changes

          To make it more obvious that we deal with operators and not tasks a new `OperatorID` class was introduced, and usages of `JobVertexID` in savepoint-related parts were replaced when appropriate.
          (fe74023)

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zentol/flink 5982_operator_state

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3770.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3770


          commit abe1bb9416b2a4159a3667d6845a4a9776abdc4f
          Author: zentol <chesnay@apache.org>
          Date: 2017-04-03T15:39:50Z

          [prerequisite] Disable exception when assigning uid on chained operator

          commit 74881a2788d034db67d99d6d32dbb2cf923aed53
          Author: zentol <chesnay@apache.org>
          Date: 2017-04-04T10:53:56Z

          [internal] Adjust SavepointLoader to new Savepoint semantics

          commit f7b8ef943097cd994a4ef3d5594fea4027720f5a
          Author: zentol <chesnay@apache.org>
          Date: 2017-04-04T13:02:55Z

          [internal] adjust PendingCheckpoint to be in line with new semantics

          commit 73427c3fc7d69f5d072d9d8a4809d449e0c5bdac
          Author: zentol <chesnay@apache.org>
          Date: 2017-04-04T11:33:54Z

          [internal] Get operator ID's into ExecutionGraph

          commit 465805792932cb888393d9257fdefd828fa59343
          Author: zentol <chesnay@apache.org>
          Date: 2017-04-25T16:07:16Z

          [internals] Extract several utility methods from StateAssignmentOperation

          commit 008e848715b7091c3deabc9251d9d673f5506e64
          Author: guowei.mgw <guowei.mgw@gmail.com>
          Date: 2017-04-24T09:47:47Z

          [internal] Add new StateAssignmentOperation

          commit ffb93298ce90956b9886b3526258f6a814b7e0af
          Author: zentol <chesnay@apache.org>
          Date: 2017-04-04T13:01:07Z

          [internal] Integrate new StateAssignmentOperation version

          commit d1efdb1c34d59f04147292b320528cd2bc838244
          Author: zentol <chesnay@apache.org>
          Date: 2017-04-03T15:40:21Z

          [tests] Add tests for chain modifications

          commit 8b45b5a77f2cc499fdbb41d8198ac0a2e25bb1d7
          Author: zentol <chesnay@apache.org>
          Date: 2017-04-24T11:58:07Z

          [tests] Adjust existing tests

          commit b5430f98bfbb56e49f9a8b21fe5b1e5dd7358714
          Author: guowei.mgw <guowei.mgw@gmail.com>
          Date: 2017-04-24T10:13:44Z

          [tests] Add tests for topology modifications

          commit fe7402358a89c37bd470437f9c3f05d7ff3d3ca1
          Author: zentol <chesnay@apache.org>
          Date: 2017-04-25T14:08:07Z

          [internal] Introduce OperatorID for state business


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3770 FLINK-5892 Restore state on the operator level General This PR is a collaboration between @guoweiM and myself, enabling Flink to restore state on the operator level. This means that the topology of a job may change in regards to chains when restoring from a 1.3 savepoint, allowing the arbitrary addition, removal or modification of chains. The cornerstone for this is a semantic change for savepoints, no structural changes have been made to the `SavepointV0/1/2` classes or their serialized format: In 1.2 a savepoint contains the states of tasks. If a task consists of multiple operators then the stored TaskState internally contains a list of states, one entry for each operator. In 1.3 a savepoint contains the states of operators only; the notion of tasks is eliminated. If a task consists of multiple operators we store one TaskState for each operator instead. Internally they each contain a list of states with a length of 1. Implementation In order for this to work a number of changes had to be made. First and foremost we required a new `StateAssignmentOperation` that was aware of operators. (74881a2, 8be9c58, 4fa8bbd) Since the SAO uses the `ExecutionGraph` classes to map the restored state it was necessary to forward the IDs of all contained operators from the `StreamingJobGraphGenerator` to the `ExecutionJobVertex`. (73427c3) The `PendingCheckpoint` class had to be adjusted to conform to the new semantics; received `SubtaskStates`, containing the state of a task, are broken down into SubtaskStates for the individual operators. (f7b8ef9) Tests The majority of this PR are new tests (60% or so). A number of tests were added under flink-tests that test the migration path from 1.2 to 1.3. (d1efdb1) These tests first restore a job from a 1.2 savepoint, without changes to the topology, verify that the state was restored correctly and finally create a new savepoint. They then restore from this migrated 1.3 savepoint, with changes to the topology for varying scenarios, and verify the correct restoration of state again. A new test was also added to the `CheckpointCoordinatorTest` which tests the support for topology changes without executing a job. (8b5430f9) A number of existing tests had to be tweaked to run with the new changes, but these changes all boil down to extending existing mocks by a method or two. (b5430f9) Other changes To make it more obvious that we deal with operators and not tasks a new `OperatorID` class was introduced, and usages of `JobVertexID` in savepoint-related parts were replaced when appropriate. (fe74023) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 5982_operator_state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3770.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3770 commit abe1bb9416b2a4159a3667d6845a4a9776abdc4f Author: zentol <chesnay@apache.org> Date: 2017-04-03T15:39:50Z [prerequisite] Disable exception when assigning uid on chained operator commit 74881a2788d034db67d99d6d32dbb2cf923aed53 Author: zentol <chesnay@apache.org> Date: 2017-04-04T10:53:56Z [internal] Adjust SavepointLoader to new Savepoint semantics commit f7b8ef943097cd994a4ef3d5594fea4027720f5a Author: zentol <chesnay@apache.org> Date: 2017-04-04T13:02:55Z [internal] adjust PendingCheckpoint to be in line with new semantics commit 73427c3fc7d69f5d072d9d8a4809d449e0c5bdac Author: zentol <chesnay@apache.org> Date: 2017-04-04T11:33:54Z [internal] Get operator ID's into ExecutionGraph commit 465805792932cb888393d9257fdefd828fa59343 Author: zentol <chesnay@apache.org> Date: 2017-04-25T16:07:16Z [internals] Extract several utility methods from StateAssignmentOperation commit 008e848715b7091c3deabc9251d9d673f5506e64 Author: guowei.mgw <guowei.mgw@gmail.com> Date: 2017-04-24T09:47:47Z [internal] Add new StateAssignmentOperation commit ffb93298ce90956b9886b3526258f6a814b7e0af Author: zentol <chesnay@apache.org> Date: 2017-04-04T13:01:07Z [internal] Integrate new StateAssignmentOperation version commit d1efdb1c34d59f04147292b320528cd2bc838244 Author: zentol <chesnay@apache.org> Date: 2017-04-03T15:40:21Z [tests] Add tests for chain modifications commit 8b45b5a77f2cc499fdbb41d8198ac0a2e25bb1d7 Author: zentol <chesnay@apache.org> Date: 2017-04-24T11:58:07Z [tests] Adjust existing tests commit b5430f98bfbb56e49f9a8b21fe5b1e5dd7358714 Author: guowei.mgw <guowei.mgw@gmail.com> Date: 2017-04-24T10:13:44Z [tests] Add tests for topology modifications commit fe7402358a89c37bd470437f9c3f05d7ff3d3ca1 Author: zentol <chesnay@apache.org> Date: 2017-04-25T14:08:07Z [internal] Introduce OperatorID for state business
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3770#discussion_r113413839

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java —
          @@ -50,7 +50,14 @@
          /** The ID of the vertex. */
          private final JobVertexID id;

          • private final ArrayList<JobVertexID> idAlternatives = new ArrayList<>();
            + /** The alternative IDs of the vertex. */
            + private final ArrayList<OperatorID> idAlternatives = new ArrayList<>();
              • End diff –

          From the comments, this looks incorrect and I would expect a `List<JobVertexID>`here

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113413839 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java — @@ -50,7 +50,14 @@ /** The ID of the vertex. */ private final JobVertexID id; private final ArrayList<JobVertexID> idAlternatives = new ArrayList<>(); + /** The alternative IDs of the vertex. */ + private final ArrayList<OperatorID> idAlternatives = new ArrayList<>(); End diff – From the comments, this looks incorrect and I would expect a `List<JobVertexID>`here
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3770#discussion_r113415296

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java —
          @@ -125,6 +132,8 @@ public JobVertex(String name) {
          public JobVertex(String name, JobVertexID id) {
          this.name = name == null ? DEFAULT_NAME : name;
          this.id = id == null ? new JobVertexID() : id;
          + this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), this.id.getUpperPart()));
          + this.operatorIdsAlternatives.add(null);
          — End diff –

          Why is it required to add `null`here, which seems strange? Either this is not required or indicates some implicit contracts about `operatorIdsAlternatives` that would at least justify a comment or (even better) a change. As far as I can see, it is just not required. Or do I miss something?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113415296 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java — @@ -125,6 +132,8 @@ public JobVertex(String name) { public JobVertex(String name, JobVertexID id) { this.name = name == null ? DEFAULT_NAME : name; this.id = id == null ? new JobVertexID() : id; + this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), this.id.getUpperPart())); + this.operatorIdsAlternatives.add(null); — End diff – Why is it required to add `null`here, which seems strange? Either this is not required or indicates some implicit contracts about `operatorIdsAlternatives` that would at least justify a comment or (even better) a change. As far as I can see, it is just not required. Or do I miss something?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3770#discussion_r113412507

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java —
          @@ -40,6 +40,10 @@ public JobVertexID(long lowerPart, long upperPart)

          { super(lowerPart, upperPart); }

          + public JobVertexID(AbstractID id) {
          — End diff –

          Different subclasses of AbstractID are intended to introduce some kind of type safety. With this in mind, I feel like this is a not very transparent way of "casting" between Ids. Maybe some `convert` methods could make this a bit more explicit than offering a public constructor for this purpose?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113412507 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java — @@ -40,6 +40,10 @@ public JobVertexID(long lowerPart, long upperPart) { super(lowerPart, upperPart); } + public JobVertexID(AbstractID id) { — End diff – Different subclasses of AbstractID are intended to introduce some kind of type safety. With this in mind, I feel like this is a not very transparent way of "casting" between Ids. Maybe some `convert` methods could make this a bit more explicit than offering a public constructor for this purpose?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3770#discussion_r113411306

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OperatorID.java —
          @@ -0,0 +1,50 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.jobgraph;
          +
          +import org.apache.flink.util.AbstractID;
          +
          +import javax.xml.bind.DatatypeConverter;
          +
          +/**
          + * A class for statistically unique operator IDs.
          + */
          +public class OperatorID extends AbstractID {
          +
          + private static final long serialVersionUID = 1L;
          +
          + public OperatorID()

          { + super(); + }

          + public OperatorID(byte[] bytes)

          { + super(bytes); + }

          +
          + public OperatorID(long lowerPart, long upperPart)

          { + super(lowerPart, upperPart); + }

          +
          + public OperatorID(AbstractID id)

          { + super(id); + }

          +
          + public static OperatorID fromHexString(String hexString) {
          — End diff –

          If my IDE is telling the truth, this method is never used and could be removed

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113411306 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OperatorID.java — @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph; + +import org.apache.flink.util.AbstractID; + +import javax.xml.bind.DatatypeConverter; + +/** + * A class for statistically unique operator IDs. + */ +public class OperatorID extends AbstractID { + + private static final long serialVersionUID = 1L; + + public OperatorID() { + super(); + } + public OperatorID(byte[] bytes) { + super(bytes); + } + + public OperatorID(long lowerPart, long upperPart) { + super(lowerPart, upperPart); + } + + public OperatorID(AbstractID id) { + super(id); + } + + public static OperatorID fromHexString(String hexString) { — End diff – If my IDE is telling the truth, this method is never used and could be removed
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3770#discussion_r113411211

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OperatorID.java —
          @@ -0,0 +1,50 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.jobgraph;
          +
          +import org.apache.flink.util.AbstractID;
          +
          +import javax.xml.bind.DatatypeConverter;
          +
          +/**
          + * A class for statistically unique operator IDs.
          + */
          +public class OperatorID extends AbstractID {
          +
          + private static final long serialVersionUID = 1L;
          +
          + public OperatorID()

          { + super(); + }

          — End diff –

          Code style: I would introduce an empty line in between and the call to super is ok, but also not required.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113411211 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OperatorID.java — @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph; + +import org.apache.flink.util.AbstractID; + +import javax.xml.bind.DatatypeConverter; + +/** + * A class for statistically unique operator IDs. + */ +public class OperatorID extends AbstractID { + + private static final long serialVersionUID = 1L; + + public OperatorID() { + super(); + } — End diff – Code style: I would introduce an empty line in between and the call to super is ok, but also not required.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3770#discussion_r113416026

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java —
          @@ -133,11 +142,15 @@ public JobVertex(String name, JobVertexID id) {

          • @param name The name of the new job vertex.
          • @param primaryId The id of the job vertex.
          • @param alternativeIds The alternative ids of the job vertex.
            + * @param operatorIds The ids of all operators contained in this job vertex.
            + * @param alternativeOperatorIds The alternative ids of all operators contained in this job vertex-
            */
          • public JobVertex(String name, JobVertexID primaryId, List<JobVertexID> alternativeIds) {
            + public JobVertex(String name, JobVertexID primaryId, List<OperatorID> alternativeIds, List<OperatorID> operatorIds, List<OperatorID> alternativeOperatorIds) {
              • End diff –

          Again, generic Type on the third parameter seems off. I also suggest to introduce line breaks to the parameter list as it is very long. On top of that, we have a lot of parameter with the same type, which callers always can mix up easily. This and the number of arguments make me wonder if it would make sense to just have the actual Ids in the constructor, plus 2 methods to provide the alternative IDs for the cases that require them?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113416026 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java — @@ -133,11 +142,15 @@ public JobVertex(String name, JobVertexID id) { @param name The name of the new job vertex. @param primaryId The id of the job vertex. @param alternativeIds The alternative ids of the job vertex. + * @param operatorIds The ids of all operators contained in this job vertex. + * @param alternativeOperatorIds The alternative ids of all operators contained in this job vertex- */ public JobVertex(String name, JobVertexID primaryId, List<JobVertexID> alternativeIds) { + public JobVertex(String name, JobVertexID primaryId, List<OperatorID> alternativeIds, List<OperatorID> operatorIds, List<OperatorID> alternativeOperatorIds) { End diff – Again, generic Type on the third parameter seems off. I also suggest to introduce line breaks to the parameter list as it is very long. On top of that, we have a lot of parameter with the same type, which callers always can mix up easily. This and the number of arguments make me wonder if it would make sense to just have the actual Ids in the constructor, plus 2 methods to provide the alternative IDs for the cases that require them?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3770#discussion_r113416299

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java —
          @@ -39,8 +39,8 @@ public InputFormatVertex(String name, JobVertexID id)

          { super(name, id); }
          • public InputFormatVertex(String name, JobVertexID id, List<JobVertexID> alternativeIds) {
          • super(name, id, alternativeIds);
            + public InputFormatVertex(String name, JobVertexID id, List<OperatorID> alternativeIds, List<OperatorID> operatorIds, List<OperatorID> alternativeOperatorIds) {
              • End diff –

          See my comments on a similar constructor in `JobVertex`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113416299 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java — @@ -39,8 +39,8 @@ public InputFormatVertex(String name, JobVertexID id) { super(name, id); } public InputFormatVertex(String name, JobVertexID id, List<JobVertexID> alternativeIds) { super(name, id, alternativeIds); + public InputFormatVertex(String name, JobVertexID id, List<OperatorID> alternativeIds, List<OperatorID> operatorIds, List<OperatorID> alternativeOperatorIds) { End diff – See my comments on a similar constructor in `JobVertex`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3770#discussion_r113482738

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java —
          @@ -125,6 +132,8 @@ public JobVertex(String name) {
          public JobVertex(String name, JobVertexID id) {
          this.name = name == null ? DEFAULT_NAME : name;
          this.id = id == null ? new JobVertexID() : id;
          + this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), this.id.getUpperPart()));
          + this.operatorIdsAlternatives.add(null);
          — End diff –

          There's an implicit contract that the length or `operatorIDs` must be equivalent to `operatorIdsAlternatives`. We could store them as a pair instead.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113482738 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java — @@ -125,6 +132,8 @@ public JobVertex(String name) { public JobVertex(String name, JobVertexID id) { this.name = name == null ? DEFAULT_NAME : name; this.id = id == null ? new JobVertexID() : id; + this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), this.id.getUpperPart())); + this.operatorIdsAlternatives.add(null); — End diff – There's an implicit contract that the length or `operatorIDs` must be equivalent to `operatorIdsAlternatives`. We could store them as a pair instead.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3770#discussion_r113482888

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java —
          @@ -133,11 +142,15 @@ public JobVertex(String name, JobVertexID id) {

          • @param name The name of the new job vertex.
          • @param primaryId The id of the job vertex.
          • @param alternativeIds The alternative ids of the job vertex.
            + * @param operatorIds The ids of all operators contained in this job vertex.
            + * @param alternativeOperatorIds The alternative ids of all operators contained in this job vertex-
            */
          • public JobVertex(String name, JobVertexID primaryId, List<JobVertexID> alternativeIds) {
            + public JobVertex(String name, JobVertexID primaryId, List<OperatorID> alternativeIds, List<OperatorID> operatorIds, List<OperatorID> alternativeOperatorIds) {
              • End diff –

          We don't know at the time job JobVertex generation whether we need them, so we have to provide them eagerly.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113482888 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java — @@ -133,11 +142,15 @@ public JobVertex(String name, JobVertexID id) { @param name The name of the new job vertex. @param primaryId The id of the job vertex. @param alternativeIds The alternative ids of the job vertex. + * @param operatorIds The ids of all operators contained in this job vertex. + * @param alternativeOperatorIds The alternative ids of all operators contained in this job vertex- */ public JobVertex(String name, JobVertexID primaryId, List<JobVertexID> alternativeIds) { + public JobVertex(String name, JobVertexID primaryId, List<OperatorID> alternativeIds, List<OperatorID> operatorIds, List<OperatorID> alternativeOperatorIds) { End diff – We don't know at the time job JobVertex generation whether we need them, so we have to provide them eagerly.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3770#discussion_r113482975

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java —
          @@ -133,11 +142,15 @@ public JobVertex(String name, JobVertexID id) {

          • @param name The name of the new job vertex.
          • @param primaryId The id of the job vertex.
          • @param alternativeIds The alternative ids of the job vertex.
            + * @param operatorIds The ids of all operators contained in this job vertex.
            + * @param alternativeOperatorIds The alternative ids of all operators contained in this job vertex-
            */
          • public JobVertex(String name, JobVertexID primaryId, List<JobVertexID> alternativeIds) {
            + public JobVertex(String name, JobVertexID primaryId, List<OperatorID> alternativeIds, List<OperatorID> operatorIds, List<OperatorID> alternativeOperatorIds) {
              • End diff –

          We could add a simple OperatorIDs pojo that encapsulates both lists.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113482975 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java — @@ -133,11 +142,15 @@ public JobVertex(String name, JobVertexID id) { @param name The name of the new job vertex. @param primaryId The id of the job vertex. @param alternativeIds The alternative ids of the job vertex. + * @param operatorIds The ids of all operators contained in this job vertex. + * @param alternativeOperatorIds The alternative ids of all operators contained in this job vertex- */ public JobVertex(String name, JobVertexID primaryId, List<JobVertexID> alternativeIds) { + public JobVertex(String name, JobVertexID primaryId, List<OperatorID> alternativeIds, List<OperatorID> operatorIds, List<OperatorID> alternativeOperatorIds) { End diff – We could add a simple OperatorIDs pojo that encapsulates both lists.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3770#discussion_r113898244

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java —
          @@ -125,6 +132,8 @@ public JobVertex(String name) {
          public JobVertex(String name, JobVertexID id) {
          this.name = name == null ? DEFAULT_NAME : name;
          this.id = id == null ? new JobVertexID() : id;
          + this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), this.id.getUpperPart()));
          + this.operatorIdsAlternatives.add(null);
          — End diff –

          Ok, in this cases it really seems better to make this explicit as you suggested. Also I was wondering if `operatorIdsAlternatives` should be a List<List<OperatorID>> – just want to make sure that only at most one alternative ID must be maintained per operator. But I think that we can always determine the savepoint version and only need compatibility to the hasher version that was valid under that savepoint version.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113898244 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java — @@ -125,6 +132,8 @@ public JobVertex(String name) { public JobVertex(String name, JobVertexID id) { this.name = name == null ? DEFAULT_NAME : name; this.id = id == null ? new JobVertexID() : id; + this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), this.id.getUpperPart())); + this.operatorIdsAlternatives.add(null); — End diff – Ok, in this cases it really seems better to make this explicit as you suggested. Also I was wondering if `operatorIdsAlternatives` should be a List<List<OperatorID>> – just want to make sure that only at most one alternative ID must be maintained per operator. But I think that we can always determine the savepoint version and only need compatibility to the hasher version that was valid under that savepoint version.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3770#discussion_r113903349

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java —
          @@ -125,6 +132,8 @@ public JobVertex(String name) {
          public JobVertex(String name, JobVertexID id) {
          this.name = name == null ? DEFAULT_NAME : name;
          this.id = id == null ? new JobVertexID() : id;
          + this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), this.id.getUpperPart()));
          + this.operatorIdsAlternatives.add(null);
          — End diff –

          We only need a single alternative ID for each operatorID, the one set using `setUIDHash()`.

          For savepoints created with 1.3 other alternatives aren't required since 1.3 doesn't use the old hasher.

          1.0-1.2 savepoints are converted to the 1.3 format using the alternative job vertex IDs (see `SavepointV2#convertToOperatorStateSavepointV2`), which is why we can't remove them. After the conversion however the are identical to a 1.3 savepoint.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113903349 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java — @@ -125,6 +132,8 @@ public JobVertex(String name) { public JobVertex(String name, JobVertexID id) { this.name = name == null ? DEFAULT_NAME : name; this.id = id == null ? new JobVertexID() : id; + this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), this.id.getUpperPart())); + this.operatorIdsAlternatives.add(null); — End diff – We only need a single alternative ID for each operatorID, the one set using `setUIDHash()`. For savepoints created with 1.3 other alternatives aren't required since 1.3 doesn't use the old hasher. 1.0-1.2 savepoints are converted to the 1.3 format using the alternative job vertex IDs (see `SavepointV2#convertToOperatorStateSavepointV2`), which is why we can't remove them. After the conversion however the are identical to a 1.3 savepoint.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3770#discussion_r113919931

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java —
          @@ -125,6 +132,8 @@ public JobVertex(String name) {
          public JobVertex(String name, JobVertexID id) {
          this.name = name == null ? DEFAULT_NAME : name;
          this.id = id == null ? new JobVertexID() : id;
          + this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), this.id.getUpperPart()));
          + this.operatorIdsAlternatives.add(null);
          — End diff –

          Yes, that is what I also expected. Just wanted to be really sure.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113919931 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java — @@ -125,6 +132,8 @@ public JobVertex(String name) { public JobVertex(String name, JobVertexID id) { this.name = name == null ? DEFAULT_NAME : name; this.id = id == null ? new JobVertexID() : id; + this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), this.id.getUpperPart())); + this.operatorIdsAlternatives.add(null); — End diff – Yes, that is what I also expected. Just wanted to be really sure.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3770#discussion_r113928558

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java —
          @@ -62,252 +69,390 @@ public StateAssignmentOperation(
          }

          public boolean assignStates() throws Exception {
          -

          • // this tracks if we find missing node hash ids and already use secondary mappings
          • boolean expandedToLegacyIds = false;
            -
            + Map<OperatorID, OperatorState> localStates = new HashMap<>(taskStates);
            Map<JobVertexID, ExecutionJobVertex> localTasks = this.tasks;
          • for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : taskStates.entrySet()) {
            -
          • TaskState taskState = taskGroupStateEntry.getValue();
            -
          • //---------------------------------------find vertex for state--------------------------------------------
            -
          • ExecutionJobVertex executionJobVertex = localTasks.get(taskGroupStateEntry.getKey());
            -
          • // on the first time we can not find the execution job vertex for an id, we also consider alternative ids,
          • // for example as generated from older flink versions, to provide backwards compatibility.
          • if (executionJobVertex == null && !expandedToLegacyIds) { - localTasks = ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks); - executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); - expandedToLegacyIds = true; - logger.info("Could not find ExecutionJobVertex. Including legacy JobVertexIDs in search."); - }

            + Set<OperatorID> allOperatorIDs = new HashSet<>();
            + for (ExecutionJobVertex executionJobVertex : tasks.values())

            { + allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs())); + }

            + for (Map.Entry<OperatorID, OperatorState> taskGroupStateEntry : taskStates.entrySet()) {

              • End diff –

          Renaming `taskStates` and `taskGroupStateEntry` to something that has `operator` instead of `task` in it makes this more readable - maybe `operatorToStateMapping`. Just some leftover from the refactoring i guess.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113928558 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java — @@ -62,252 +69,390 @@ public StateAssignmentOperation( } public boolean assignStates() throws Exception { - // this tracks if we find missing node hash ids and already use secondary mappings boolean expandedToLegacyIds = false; - + Map<OperatorID, OperatorState> localStates = new HashMap<>(taskStates); Map<JobVertexID, ExecutionJobVertex> localTasks = this.tasks; for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : taskStates.entrySet()) { - TaskState taskState = taskGroupStateEntry.getValue(); - //--------------------------------------- find vertex for state -------------------------------------------- - ExecutionJobVertex executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); - // on the first time we can not find the execution job vertex for an id, we also consider alternative ids, // for example as generated from older flink versions, to provide backwards compatibility. if (executionJobVertex == null && !expandedToLegacyIds) { - localTasks = ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks); - executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); - expandedToLegacyIds = true; - logger.info("Could not find ExecutionJobVertex. Including legacy JobVertexIDs in search."); - } + Set<OperatorID> allOperatorIDs = new HashSet<>(); + for (ExecutionJobVertex executionJobVertex : tasks.values()) { + allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs())); + } + for (Map.Entry<OperatorID, OperatorState> taskGroupStateEntry : taskStates.entrySet()) { End diff – Renaming `taskStates` and `taskGroupStateEntry` to something that has `operator` instead of `task` in it makes this more readable - maybe `operatorToStateMapping`. Just some leftover from the refactoring i guess.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3770#discussion_r113928105

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java —
          @@ -62,252 +69,390 @@ public StateAssignmentOperation(
          }

          public boolean assignStates() throws Exception {
          -

          • // this tracks if we find missing node hash ids and already use secondary mappings
          • boolean expandedToLegacyIds = false;
            -
            + Map<OperatorID, OperatorState> localStates = new HashMap<>(taskStates);
            Map<JobVertexID, ExecutionJobVertex> localTasks = this.tasks;
          • for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : taskStates.entrySet()) {
            -
          • TaskState taskState = taskGroupStateEntry.getValue();
            -
          • //---------------------------------------find vertex for state--------------------------------------------
            -
          • ExecutionJobVertex executionJobVertex = localTasks.get(taskGroupStateEntry.getKey());
            -
          • // on the first time we can not find the execution job vertex for an id, we also consider alternative ids,
          • // for example as generated from older flink versions, to provide backwards compatibility.
          • if (executionJobVertex == null && !expandedToLegacyIds) { - localTasks = ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks); - executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); - expandedToLegacyIds = true; - logger.info("Could not find ExecutionJobVertex. Including legacy JobVertexIDs in search."); - }

            + Set<OperatorID> allOperatorIDs = new HashSet<>();
            + for (ExecutionJobVertex executionJobVertex : tasks.values()) {
            + allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs()));

              • End diff –

          I we change to immutable list instead of array, this code also saves one converting to list

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113928105 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java — @@ -62,252 +69,390 @@ public StateAssignmentOperation( } public boolean assignStates() throws Exception { - // this tracks if we find missing node hash ids and already use secondary mappings boolean expandedToLegacyIds = false; - + Map<OperatorID, OperatorState> localStates = new HashMap<>(taskStates); Map<JobVertexID, ExecutionJobVertex> localTasks = this.tasks; for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : taskStates.entrySet()) { - TaskState taskState = taskGroupStateEntry.getValue(); - //--------------------------------------- find vertex for state -------------------------------------------- - ExecutionJobVertex executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); - // on the first time we can not find the execution job vertex for an id, we also consider alternative ids, // for example as generated from older flink versions, to provide backwards compatibility. if (executionJobVertex == null && !expandedToLegacyIds) { - localTasks = ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks); - executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); - expandedToLegacyIds = true; - logger.info("Could not find ExecutionJobVertex. Including legacy JobVertexIDs in search."); - } + Set<OperatorID> allOperatorIDs = new HashSet<>(); + for (ExecutionJobVertex executionJobVertex : tasks.values()) { + allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs())); End diff – I we change to immutable list instead of array, this code also saves one converting to list
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3770#discussion_r113922612

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java —
          @@ -0,0 +1,183 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.checkpoint;
          +
          +import org.apache.flink.runtime.jobgraph.OperatorID;
          +import org.apache.flink.runtime.state.CompositeStateHandle;
          +import org.apache.flink.runtime.state.SharedStateRegistry;
          +import org.apache.flink.util.Preconditions;
          +
          +import java.util.Collection;
          +import java.util.Collections;
          +import java.util.HashMap;
          +import java.util.Map;
          +import java.util.Objects;
          +
          +/**
          + * Simple container class which contains the raw/managed/legacy operator state and key-group state handles for the sub
          + * tasks of an operator.
          + */
          +public class OperatorState implements CompositeStateHandle {
          +
          + private static final long serialVersionUID = -4845578005863201810L;
          +
          + /** id of the operator */
          + private final OperatorID operatorID;
          +
          + /** handles to non-partitioned states, subtaskindex -> subtaskstate */
          + private final Map<Integer, OperatorSubtaskState> subtaskStates;
          — End diff –

          here and in a few other places in this class, we could add the `operator` String to the variable names to make it clear for user that we are now dealing with state on the operator level and avoid confusing flink veterans that have a certain mental mapping for the word `(Sub)TaskState` that they must update.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113922612 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java — @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Simple container class which contains the raw/managed/legacy operator state and key-group state handles for the sub + * tasks of an operator. + */ +public class OperatorState implements CompositeStateHandle { + + private static final long serialVersionUID = -4845578005863201810L; + + /** id of the operator */ + private final OperatorID operatorID; + + /** handles to non-partitioned states, subtaskindex -> subtaskstate */ + private final Map<Integer, OperatorSubtaskState> subtaskStates; — End diff – here and in a few other places in this class, we could add the `operator` String to the variable names to make it clear for user that we are now dealing with state on the operator level and avoid confusing flink veterans that have a certain mental mapping for the word `(Sub)TaskState` that they must update.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3770#discussion_r113924290

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java —
          @@ -139,6 +158,10 @@ public ExecutionJobVertex(
          this.serializedTaskInformation = null;

          this.taskVertices = new ExecutionVertex[numTaskVertices];
          + List<OperatorID> opIDs = jobVertex.getOperatorIDs();
          + this.operatorIDs = opIDs.toArray(new OperatorID[opIDs.size()]);
          — End diff –

          How about making `operatorIDs` an immutable list instead of an array. I think all the operations you perform could also run on an array list and we could enforce immutability so that nobody is tempted to modify the inner state of the original array (e.g. to reverse the element order for convenience in other parts of the code). Same for the alternative Ids.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113924290 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java — @@ -139,6 +158,10 @@ public ExecutionJobVertex( this.serializedTaskInformation = null; this.taskVertices = new ExecutionVertex [numTaskVertices] ; + List<OperatorID> opIDs = jobVertex.getOperatorIDs(); + this.operatorIDs = opIDs.toArray(new OperatorID [opIDs.size()] ); — End diff – How about making `operatorIDs` an immutable list instead of an array. I think all the operations you perform could also run on an array list and we could enforce immutability so that nobody is tempted to modify the inner state of the original array (e.g. to reverse the element order for convenience in other parts of the code). Same for the alternative Ids.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3770#discussion_r113920542

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java —
          @@ -1025,11 +1026,11 @@ public boolean restoreLatestCheckpointedState(
          LOG.info("Restoring from latest valid checkpoint: {}.", latest);

          // re-assign the task states
          -

          • final Map<JobVertexID, TaskState> taskStates = latest.getTaskStates();
            + final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();

          StateAssignmentOperation stateAssignmentOperation =

          • new StateAssignmentOperation(LOG, tasks, taskStates, allowNonRestoredState);
            + new StateAssignmentOperation(LOG, tasks, operatorStates, allowNonRestoredState);
              • End diff –

          Not sure why this is implemented in a way that a logger is passed to the `StateAssignmentOperation`. I guess the class should simply have its own logger. I think this could be changed. But seems like this was introduced earlier and is unrelated to this PR. But I wouldn't to have this refactored to the normal logger scheme before we merge.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113920542 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java — @@ -1025,11 +1026,11 @@ public boolean restoreLatestCheckpointedState( LOG.info("Restoring from latest valid checkpoint: {}.", latest); // re-assign the task states - final Map<JobVertexID, TaskState> taskStates = latest.getTaskStates(); + final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates(); StateAssignmentOperation stateAssignmentOperation = new StateAssignmentOperation(LOG, tasks, taskStates, allowNonRestoredState); + new StateAssignmentOperation(LOG, tasks, operatorStates, allowNonRestoredState); End diff – Not sure why this is implemented in a way that a logger is passed to the `StateAssignmentOperation`. I guess the class should simply have its own logger. I think this could be changed. But seems like this was introduced earlier and is unrelated to this PR. But I wouldn't to have this refactored to the normal logger scheme before we merge.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3770#discussion_r113929555

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java —
          @@ -62,252 +69,390 @@ public StateAssignmentOperation(
          }

          public boolean assignStates() throws Exception {
          -

          • // this tracks if we find missing node hash ids and already use secondary mappings
          • boolean expandedToLegacyIds = false;
            -
            + Map<OperatorID, OperatorState> localStates = new HashMap<>(taskStates);
            Map<JobVertexID, ExecutionJobVertex> localTasks = this.tasks;
          • for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : taskStates.entrySet()) {
            -
          • TaskState taskState = taskGroupStateEntry.getValue();
            -
          • //---------------------------------------find vertex for state--------------------------------------------
            -
          • ExecutionJobVertex executionJobVertex = localTasks.get(taskGroupStateEntry.getKey());
            -
          • // on the first time we can not find the execution job vertex for an id, we also consider alternative ids,
          • // for example as generated from older flink versions, to provide backwards compatibility.
          • if (executionJobVertex == null && !expandedToLegacyIds) { - localTasks = ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks); - executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); - expandedToLegacyIds = true; - logger.info("Could not find ExecutionJobVertex. Including legacy JobVertexIDs in search."); - }

            + Set<OperatorID> allOperatorIDs = new HashSet<>();
            + for (ExecutionJobVertex executionJobVertex : tasks.values())

            { + allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs())); + }

            + for (Map.Entry<OperatorID, OperatorState> taskGroupStateEntry : taskStates.entrySet()) {

              • End diff –

          This loop looks like we could factor it out into a private precondition method like `checkStateMappingCompleteness` or something like that. Even the previous loop and everything working on the hash set could go there.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113929555 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java — @@ -62,252 +69,390 @@ public StateAssignmentOperation( } public boolean assignStates() throws Exception { - // this tracks if we find missing node hash ids and already use secondary mappings boolean expandedToLegacyIds = false; - + Map<OperatorID, OperatorState> localStates = new HashMap<>(taskStates); Map<JobVertexID, ExecutionJobVertex> localTasks = this.tasks; for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : taskStates.entrySet()) { - TaskState taskState = taskGroupStateEntry.getValue(); - //--------------------------------------- find vertex for state -------------------------------------------- - ExecutionJobVertex executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); - // on the first time we can not find the execution job vertex for an id, we also consider alternative ids, // for example as generated from older flink versions, to provide backwards compatibility. if (executionJobVertex == null && !expandedToLegacyIds) { - localTasks = ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks); - executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); - expandedToLegacyIds = true; - logger.info("Could not find ExecutionJobVertex. Including legacy JobVertexIDs in search."); - } + Set<OperatorID> allOperatorIDs = new HashSet<>(); + for (ExecutionJobVertex executionJobVertex : tasks.values()) { + allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs())); + } + for (Map.Entry<OperatorID, OperatorState> taskGroupStateEntry : taskStates.entrySet()) { End diff – This loop looks like we could factor it out into a private precondition method like `checkStateMappingCompleteness` or something like that. Even the previous loop and everything working on the hash set could go there.
          Hide
          Zentol Chesnay Schepler added a comment -

          Implemented in 8045fabac736cc8c6b48fda8328cf91f329dc3bf, f7980a7e29457753eb3c5b975f3bb4b59d2014f8 and 2c68085f658873c2d5836fbad6b82be76a79f0f9.

          Show
          Zentol Chesnay Schepler added a comment - Implemented in 8045fabac736cc8c6b48fda8328cf91f329dc3bf, f7980a7e29457753eb3c5b975f3bb4b59d2014f8 and 2c68085f658873c2d5836fbad6b82be76a79f0f9.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3770

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3770
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zentol opened a pull request:

          https://github.com/apache/flink/pull/3842

          FLINK-5892 Enable 1.2 keyed state test

          This PR enables the 1.2 keyed state restore test added in FLINK-5892. It was temporarily disabled due to a general issue with backwards compatibility.

          First the utility functions in `KeyedJob` are modified to only assign UIDs when migrating and restoring, but not when generating the job.

          Second, we now actually use a 1.2 savepoint.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zentol/flink 5892_enable_test

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3842.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3842


          commit 76edf6ae2f160527427833ae0aa94a16ab279fa0
          Author: zentol <chesnay@apache.org>
          Date: 2017-05-08T09:56:57Z

          FLINK-5892 Enable 1.2 keyed state test


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3842 FLINK-5892 Enable 1.2 keyed state test This PR enables the 1.2 keyed state restore test added in FLINK-5892 . It was temporarily disabled due to a general issue with backwards compatibility. First the utility functions in `KeyedJob` are modified to only assign UIDs when migrating and restoring, but not when generating the job. Second, we now actually use a 1.2 savepoint. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 5892_enable_test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3842.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3842 commit 76edf6ae2f160527427833ae0aa94a16ab279fa0 Author: zentol <chesnay@apache.org> Date: 2017-05-08T09:56:57Z FLINK-5892 Enable 1.2 keyed state test
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zentol opened a pull request:

          https://github.com/apache/flink/pull/3843

          FLINK-5892 Enable 1.2 keyed state test

          Backport of #3842 for the 1.3 branch.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zentol/flink 13_5892_enable_test

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3843.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3843


          commit 7a0fff31d483633f88026650956b97cc451f19df
          Author: zentol <chesnay@apache.org>
          Date: 2017-05-08T09:56:57Z

          FLINK-5892 Enable 1.2 keyed state test


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3843 FLINK-5892 Enable 1.2 keyed state test Backport of #3842 for the 1.3 branch. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 13_5892_enable_test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3843.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3843 commit 7a0fff31d483633f88026650956b97cc451f19df Author: zentol <chesnay@apache.org> Date: 2017-05-08T09:56:57Z FLINK-5892 Enable 1.2 keyed state test
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol closed the pull request at:

          https://github.com/apache/flink/pull/3843

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol closed the pull request at: https://github.com/apache/flink/pull/3843
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zentol opened a pull request:

          https://github.com/apache/flink/pull/3844

          FLINK-5892 Enable 1.2 keyed state test

          Backport of #3842 for the 1.3 branch.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zentol/flink 13_5892_enable_test

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3844.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3844


          commit 7a0fff31d483633f88026650956b97cc451f19df
          Author: zentol <chesnay@apache.org>
          Date: 2017-05-08T09:56:57Z

          FLINK-5892 Enable 1.2 keyed state test


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3844 FLINK-5892 Enable 1.2 keyed state test Backport of #3842 for the 1.3 branch. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 13_5892_enable_test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3844.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3844 commit 7a0fff31d483633f88026650956b97cc451f19df Author: zentol <chesnay@apache.org> Date: 2017-05-08T09:56:57Z FLINK-5892 Enable 1.2 keyed state test
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3842

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3842
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user pnowojski commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3844#discussion_r122718852

          — Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java —
          @@ -98,10 +98,9 @@ public static void main(String[] args) throws Exception {
          .map(new StatefulStringStoringMap(mode, "first"))
          .setParallelism(4);

          • // TODO: re-enable this when generating the actual 1.2 savepoint
          • //if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { - map.uid("first"); - //}

            + if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {

              • End diff –

          I somehow don't like it that is not explained in the commit message what has actually changed/why was this change required at all. Especially since you have not changed anything else in the code, it is difficult to understand that. If nothing else has changed, why do we need this `if (...)`? If something has changed, shouldn't it be covered by some test?

          Show
          githubbot ASF GitHub Bot added a comment - Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/3844#discussion_r122718852 — Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java — @@ -98,10 +98,9 @@ public static void main(String[] args) throws Exception { .map(new StatefulStringStoringMap(mode, "first")) .setParallelism(4); // TODO: re-enable this when generating the actual 1.2 savepoint //if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { - map.uid("first"); - //} + if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { End diff – I somehow don't like it that is not explained in the commit message what has actually changed/why was this change required at all. Especially since you have not changed anything else in the code, it is difficult to understand that. If nothing else has changed, why do we need this `if (...)`? If something has changed, shouldn't it be covered by some test?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3844#discussion_r122894136

          — Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java —
          @@ -98,10 +98,9 @@ public static void main(String[] args) throws Exception {
          .map(new StatefulStringStoringMap(mode, "first"))
          .setParallelism(4);

          • // TODO: re-enable this when generating the actual 1.2 savepoint
          • //if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { - map.uid("first"); - //}

            + if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {

              • End diff –

          You are completely right, the commit message/PR description isn't sufficient to explain what this PR changes. In fact, it took me a bit to remember that as well. I'll adjust the commit message later on.

          So this PR is pretty subtle, since the changes to the code aren't the interesting part, but the change to the `complexKeyed-flink1.2/_metadata` file is. This file is supposed to be a 1.2 savepoint to verify the restore behavior from them in 1.3. But this file is not a 1.2 savepoint, because at the time of merging the restoration of keyed 1.2 state was broken, In the meantime we used a 1.3 savepoint instead.

          The main thing this PR does is replace this 1.3 savepoint with an actual 1.2 savepoint.

          The second change is related to the uid's. In 1.2, it is not possible to assign UIDs to chained operators. As "first" and "second" are both chained to the window function we are not allowed to call `map.uid("...")` when generating the 1.2 savepoint (! (MIGRATE || RESTORE)). However, in 1.3 it is possible and in fact mandatory to assign UIDs.

          Does that clear things up?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3844#discussion_r122894136 — Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java — @@ -98,10 +98,9 @@ public static void main(String[] args) throws Exception { .map(new StatefulStringStoringMap(mode, "first")) .setParallelism(4); // TODO: re-enable this when generating the actual 1.2 savepoint //if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { - map.uid("first"); - //} + if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { End diff – You are completely right, the commit message/PR description isn't sufficient to explain what this PR changes. In fact, it took me a bit to remember that as well. I'll adjust the commit message later on. So this PR is pretty subtle, since the changes to the code aren't the interesting part, but the change to the `complexKeyed-flink1.2/_metadata` file is. This file is supposed to be a 1.2 savepoint to verify the restore behavior from them in 1.3. But this file is not a 1.2 savepoint, because at the time of merging the restoration of keyed 1.2 state was broken, In the meantime we used a 1.3 savepoint instead. The main thing this PR does is replace this 1.3 savepoint with an actual 1.2 savepoint. The second change is related to the uid's. In 1.2, it is not possible to assign UIDs to chained operators. As "first" and "second" are both chained to the window function we are not allowed to call `map.uid("...")` when generating the 1.2 savepoint (! (MIGRATE || RESTORE)). However, in 1.3 it is possible and in fact mandatory to assign UIDs. Does that clear things up?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3844

          yes, this is present in master, but we should have it in the 1.3 branch as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3844 yes, this is present in master, but we should have it in the 1.3 branch as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user pnowojski commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3844#discussion_r122897990

          — Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java —
          @@ -98,10 +98,9 @@ public static void main(String[] args) throws Exception {
          .map(new StatefulStringStoringMap(mode, "first"))
          .setParallelism(4);

          • // TODO: re-enable this when generating the actual 1.2 savepoint
          • //if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { - map.uid("first"); - //}

            + if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {

              • End diff –

          Yes, thank you very much, now I get it

          I think since this code is already on the master it's a bit too late change a commit message there

          Show
          githubbot ASF GitHub Bot added a comment - Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/3844#discussion_r122897990 — Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java — @@ -98,10 +98,9 @@ public static void main(String[] args) throws Exception { .map(new StatefulStringStoringMap(mode, "first")) .setParallelism(4); // TODO: re-enable this when generating the actual 1.2 savepoint //if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { - map.uid("first"); - //} + if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { End diff – Yes, thank you very much, now I get it I think since this code is already on the master it's a bit too late change a commit message there
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user pnowojski commented on the issue:

          https://github.com/apache/flink/pull/3844

          Are you sure that errors in travis are intermittent or unrelated to your change? One is already reported here: https://issues.apache.org/jira/browse/FLINK-6843 but second one I'm not sure:

          ```
          Tests run: 10, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 6.933 sec <<< FAILURE! - in org.apache.flink.runtime.state.OperatorStateBackendTest
          testSnapshotAsyncCancel(org.apache.flink.runtime.state.OperatorStateBackendTest) Time elapsed: 0.061 sec <<< ERROR!
          java.util.concurrent.ExecutionException: java.io.IOException: Stream closed.
          at java.util.concurrent.FutureTask.report(FutureTask.java:122)
          at java.util.concurrent.FutureTask.get(FutureTask.java:206)
          at org.apache.flink.runtime.state.OperatorStateBackendTest.testSnapshotAsyncCancel(OperatorStateBackendTest.java:636)
          Caused by: java.io.IOException: Stream closed.
          at org.apache.flink.runtime.util.BlockerCheckpointStreamFactory$1.write(BlockerCheckpointStreamFactory.java:95)
          at java.io.DataOutputStream.writeInt(DataOutputStream.java:197)
          at org.apache.flink.core.io.VersionedIOReadableWritable.write(VersionedIOReadableWritable.java:40)
          at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.write(OperatorBackendSerializationProxy.java:65)
          at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:255)
          at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:233)
          at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
          at java.lang.Thread.run(Thread.java:745)
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/3844 Are you sure that errors in travis are intermittent or unrelated to your change? One is already reported here: https://issues.apache.org/jira/browse/FLINK-6843 but second one I'm not sure: ``` Tests run: 10, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 6.933 sec <<< FAILURE! - in org.apache.flink.runtime.state.OperatorStateBackendTest testSnapshotAsyncCancel(org.apache.flink.runtime.state.OperatorStateBackendTest) Time elapsed: 0.061 sec <<< ERROR! java.util.concurrent.ExecutionException: java.io.IOException: Stream closed. at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:206) at org.apache.flink.runtime.state.OperatorStateBackendTest.testSnapshotAsyncCancel(OperatorStateBackendTest.java:636) Caused by: java.io.IOException: Stream closed. at org.apache.flink.runtime.util.BlockerCheckpointStreamFactory$1.write(BlockerCheckpointStreamFactory.java:95) at java.io.DataOutputStream.writeInt(DataOutputStream.java:197) at org.apache.flink.core.io.VersionedIOReadableWritable.write(VersionedIOReadableWritable.java:40) at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.write(OperatorBackendSerializationProxy.java:65) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:255) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:233) at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3844

          That's unrelated. The changes made here can only affect the `KeyedComplexChainTest`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3844 That's unrelated. The changes made here can only affect the `KeyedComplexChainTest`.
          Hide
          Zentol Chesnay Schepler added a comment -

          The KeyedComplexChainTest was updated to properly use a 1.2 snapshot (at the time of merging restoration of keyed state was broken).

          1.3: 0fd3683c3489d98c5c82d21b9a7a5ee93c0d6b2e
          1.4: 7a0fff31d483633f88026650956b97cc451f19df

          Show
          Zentol Chesnay Schepler added a comment - The KeyedComplexChainTest was updated to properly use a 1.2 snapshot (at the time of merging restoration of keyed state was broken). 1.3: 0fd3683c3489d98c5c82d21b9a7a5ee93c0d6b2e 1.4: 7a0fff31d483633f88026650956b97cc451f19df
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol closed the pull request at:

          https://github.com/apache/flink/pull/3844

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol closed the pull request at: https://github.com/apache/flink/pull/3844

            People

            • Assignee:
              maguowei Guowei Ma
              Reporter:
              maguowei Guowei Ma
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development