Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5979

Backwards compatibility for HeapKeyedStateBackend serialization format (1.2 -> 1.3)

    Details

    • Type: New Feature
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0
    • Labels:
      None

      Description

      Changes in HeapKeyedStateBackend for Flink 1.3 have triggered a change of the serialization format. This issue tracks the backwards compatibility story between the format in 1.2 and 1.3.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StefanRRichter opened a pull request:

          https://github.com/apache/flink/pull/3483

          FLINK-5979 Backwards compatibility for HeapKeyedStateBackend serialization format (1.2 -> 1.3)

          This PR re-establishes the broken backwards compatibility in the serialization format of `KeyedStateTable` between Flink 1.2 and Flink 1.3.

          This PR sits on top of #3466 .

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StefanRRichter/flink state-table-interface-backwards-compatibility

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3483.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3483


          commit dd0863fc029a0d5ab3d52bb402663c8543f7f483
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-02-20T17:12:10Z

          Introduce abstraction for StateTable

          commit a0cc386d29dd6a76926fb260cac43019add5bc2a
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-03-03T09:51:15Z

          Improved copy performance for ArrayListSerializer

          commit 222ae47622372268af895a1c3e4d559b5ece87ab
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-03-03T09:25:43Z

          Improve ManualWindowSpeedITCase by randomizing the access pattern

          commit fdae4cd7674a755c43260e6b7ba756431157c141
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-03-03T09:50:52Z

          Additional unit tests

          commit e13237e025691a4e3500e984c9b50364d916aa8c
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-03-03T10:08:00Z

          Asynchronous snapshots through CopyOnWriteStateTable

          commit e92275b1ecef7fead004d92b5c6fc488dadbb52c
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-03-06T17:04:17Z

          Implement backwards compatibility for state table serialization format.

          commit 2b083b84b371d75e1a59feeea1273938325af505
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-03-07T10:37:36Z

          Unit test


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/3483 FLINK-5979 Backwards compatibility for HeapKeyedStateBackend serialization format (1.2 -> 1.3) This PR re-establishes the broken backwards compatibility in the serialization format of `KeyedStateTable` between Flink 1.2 and Flink 1.3. This PR sits on top of #3466 . You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink state-table-interface-backwards-compatibility Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3483.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3483 commit dd0863fc029a0d5ab3d52bb402663c8543f7f483 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-02-20T17:12:10Z Introduce abstraction for StateTable commit a0cc386d29dd6a76926fb260cac43019add5bc2a Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-03-03T09:51:15Z Improved copy performance for ArrayListSerializer commit 222ae47622372268af895a1c3e4d559b5ece87ab Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-03-03T09:25:43Z Improve ManualWindowSpeedITCase by randomizing the access pattern commit fdae4cd7674a755c43260e6b7ba756431157c141 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-03-03T09:50:52Z Additional unit tests commit e13237e025691a4e3500e984c9b50364d916aa8c Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-03-03T10:08:00Z Asynchronous snapshots through CopyOnWriteStateTable commit e92275b1ecef7fead004d92b5c6fc488dadbb52c Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-03-06T17:04:17Z Implement backwards compatibility for state table serialization format. commit 2b083b84b371d75e1a59feeea1273938325af505 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-03-07T10:37:36Z Unit test
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3483

          R @aljoscha

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3483 R @aljoscha
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3483#discussion_r106135988

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java —
          @@ -359,4 +360,8 @@ public KeyGroupRange getKeyGroupRange() {
          public void close() throws IOException

          { cancelStreamRegistry.close(); }

          +
          + public boolean supportsAsynchronousSnapshots() {
          — End diff –

          This should probably be `@VisibleForTesting`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3483#discussion_r106135988 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java — @@ -359,4 +360,8 @@ public KeyGroupRange getKeyGroupRange() { public void close() throws IOException { cancelStreamRegistry.close(); } + + public boolean supportsAsynchronousSnapshots() { — End diff – This should probably be `@VisibleForTesting`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3483#discussion_r106139194

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateEntry.java —
          @@ -0,0 +1,100 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.state.heap;
          +
          +import java.util.Objects;
          +
          +/**
          + * One full entry in a state table. Consists of an immutable key (not null), an immutable namespace (not null), and
          + * a state that can be mutable and null.
          + *
          + * @param <K> type of key
          + * @param <N> type of namespace
          + * @param <S> type of value
          + */
          +public class StateEntry<K, N, S> {
          — End diff –

          Is this used anywhere other than as a base class for `StateTableEntry` in `CopyOnWriteStateTable`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3483#discussion_r106139194 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateEntry.java — @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import java.util.Objects; + +/** + * One full entry in a state table. Consists of an immutable key (not null), an immutable namespace (not null), and + * a state that can be mutable and null. + * + * @param <K> type of key + * @param <N> type of namespace + * @param <S> type of value + */ +public class StateEntry<K, N, S> { — End diff – Is this used anywhere other than as a base class for `StateTableEntry` in `CopyOnWriteStateTable`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3483#discussion_r106132349

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyContext.java —
          @@ -0,0 +1,58 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.state.heap;
          +
          +import org.apache.flink.api.common.typeutils.TypeSerializer;
          +import org.apache.flink.runtime.state.KeyGroupRange;
          +
          +/**
          + * This interface is the current context of a keyed state. It provides information about the currently selected key in
          + * the context, the corresponding key-group, and other key and key-grouping related information.
          + * <p>
          + * The typical use case for this interface is providing a view on the current-key selection aspects of
          + *

          {@link org.apache.flink.runtime.state.KeyedStateBackend}

          .
          + */
          +public interface KeyContext<K> {
          — End diff –

          We had a quick offline discussion about this because there already exists a `KeyContext` in Flink. Maybe you can rename this one to `InternalKeyContext` for now because it serves somewhat different purposes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3483#discussion_r106132349 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyContext.java — @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.KeyGroupRange; + +/** + * This interface is the current context of a keyed state. It provides information about the currently selected key in + * the context, the corresponding key-group, and other key and key-grouping related information. + * <p> + * The typical use case for this interface is providing a view on the current-key selection aspects of + * {@link org.apache.flink.runtime.state.KeyedStateBackend} . + */ +public interface KeyContext<K> { — End diff – We had a quick offline discussion about this because there already exists a `KeyContext` in Flink. Maybe you can rename this one to `InternalKeyContext` for now because it serves somewhat different purposes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3483#discussion_r106138010

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java —
          @@ -0,0 +1,1021 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.state.heap;
          +
          +import org.apache.flink.annotation.VisibleForTesting;
          +import org.apache.flink.api.common.typeutils.TypeSerializer;
          +import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
          +import org.apache.flink.runtime.state.StateTransformationFunction;
          +import org.apache.flink.util.MathUtils;
          +import org.apache.flink.util.Preconditions;
          +import org.slf4j.Logger;
          +import org.slf4j.LoggerFactory;
          +
          +import java.util.Arrays;
          +import java.util.ConcurrentModificationException;
          +import java.util.Iterator;
          +import java.util.NoSuchElementException;
          +import java.util.TreeSet;
          +
          +/**
          + * Basis for Flink's in-memory state tables with copy-on-write support. This map does not support null values for
          + * key or namespace.
          + * <p>
          + * StateTable sacrifices some peak performance and memory efficiency for features like incremental rehashing and
          — End diff –

          Could be `

          {@code CopyOnWriteStateTable}

          sacrifices ...`

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3483#discussion_r106138010 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java — @@ -0,0 +1,1021 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.TreeSet; + +/** + * Basis for Flink's in-memory state tables with copy-on-write support. This map does not support null values for + * key or namespace. + * <p> + * StateTable sacrifices some peak performance and memory efficiency for features like incremental rehashing and — End diff – Could be ` {@code CopyOnWriteStateTable} sacrifices ...`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3483#discussion_r106133243

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java —
          @@ -47,77 +44,62 @@
          /**

          • Creates a new key/value state for the given hash map of key/value pairs.
            *
          • * @param backend The state backend backing that created this state.
          • @param stateDesc The state identifier for the state. This contains name
          • and can create a default state value.
          • @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
            */
            protected AbstractHeapMergingState(
          • KeyedStateBackend<K> backend,
            SD stateDesc,
            StateTable<K, N, SV> stateTable,
            TypeSerializer<K> keySerializer,
            TypeSerializer<N> namespaceSerializer) { - super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer); + super(stateDesc, stateTable, keySerializer, namespaceSerializer); + this.mergeTransformation = new MergeTransformation(); }

          + private final MergeTransformation mergeTransformation;
          — End diff –

          Curious placement of field. 😉

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3483#discussion_r106133243 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java — @@ -47,77 +44,62 @@ /** Creates a new key/value state for the given hash map of key/value pairs. * * @param backend The state backend backing that created this state. @param stateDesc The state identifier for the state. This contains name and can create a default state value. @param stateTable The state tab;e to use in this kev/value state. May contain initial state. */ protected AbstractHeapMergingState( KeyedStateBackend<K> backend, SD stateDesc, StateTable<K, N, SV> stateTable, TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer) { - super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer); + super(stateDesc, stateTable, keySerializer, namespaceSerializer); + this.mergeTransformation = new MergeTransformation(); } + private final MergeTransformation mergeTransformation; — End diff – Curious placement of field. 😉
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3483#discussion_r106136980

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotBackwardsCompatibilityTest.java —
          @@ -0,0 +1,107 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.state.heap;
          +
          +import org.apache.flink.api.common.ExecutionConfig;
          +import org.apache.flink.api.common.state.ListStateDescriptor;
          +import org.apache.flink.api.common.typeutils.base.IntSerializer;
          +import org.apache.flink.api.common.typeutils.base.StringSerializer;
          +import org.apache.flink.runtime.query.TaskKvStateRegistry;
          +import org.apache.flink.runtime.state.KeyGroupRange;
          +import org.apache.flink.runtime.state.KeyGroupsStateHandle;
          +import org.apache.flink.runtime.state.internal.InternalListState;
          +import org.apache.flink.util.InstantiationUtil;
          +import org.apache.flink.util.Preconditions;
          +import org.junit.Test;
          +
          +import java.io.BufferedInputStream;
          +import java.io.FileInputStream;
          +import java.net.URL;
          +import java.util.Collections;
          +
          +import static java.util.Arrays.asList;
          +import static org.junit.Assert.assertEquals;
          +import static org.mockito.Mockito.mock;
          +
          +public class HeapKeyedStateBackendSnapshotBackwardsCompatibilityTest {
          — End diff –

          All other tests for backwards compatibility end in `*MigrationTest`, we should probably stick to this naming pattern.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3483#discussion_r106136980 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotBackwardsCompatibilityTest.java — @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; +import org.junit.Test; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.net.URL; +import java.util.Collections; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +public class HeapKeyedStateBackendSnapshotBackwardsCompatibilityTest { — End diff – All other tests for backwards compatibility end in `*MigrationTest`, we should probably stick to this naming pattern.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3483#discussion_r106138099

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java —
          @@ -0,0 +1,1021 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.state.heap;
          +
          +import org.apache.flink.annotation.VisibleForTesting;
          +import org.apache.flink.api.common.typeutils.TypeSerializer;
          +import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
          +import org.apache.flink.runtime.state.StateTransformationFunction;
          +import org.apache.flink.util.MathUtils;
          +import org.apache.flink.util.Preconditions;
          +import org.slf4j.Logger;
          +import org.slf4j.LoggerFactory;
          +
          +import java.util.Arrays;
          +import java.util.ConcurrentModificationException;
          +import java.util.Iterator;
          +import java.util.NoSuchElementException;
          +import java.util.TreeSet;
          +
          +/**
          + * Basis for Flink's in-memory state tables with copy-on-write support. This map does not support null values for
          + * key or namespace.
          + * <p>
          + * StateTable sacrifices some peak performance and memory efficiency for features like incremental rehashing and
          + * asynchronous snapshots through copy-on-write. Copy-on-write tries to minimize the amount of copying by maintaining
          + * version meta data for both, the map structure and the state objects. However, we must often proactively copy state
          + * objects when we hand them to the user.
          + * <p>
          + * As for any state backend, user should not keep references on state objects that they obtained from state backends
          + * outside the scope of the user function calls.
          + * <p>
          + * Some brief maintenance notes:
          + * <p>
          + * 1) Flattening the underlying data structure from a nested maps (namespace) -> (key) -> (state) to one flat map
          — End diff –

          Lose the `a` in `from a nested maps`, I think.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3483#discussion_r106138099 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java — @@ -0,0 +1,1021 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.TreeSet; + +/** + * Basis for Flink's in-memory state tables with copy-on-write support. This map does not support null values for + * key or namespace. + * <p> + * StateTable sacrifices some peak performance and memory efficiency for features like incremental rehashing and + * asynchronous snapshots through copy-on-write. Copy-on-write tries to minimize the amount of copying by maintaining + * version meta data for both, the map structure and the state objects. However, we must often proactively copy state + * objects when we hand them to the user. + * <p> + * As for any state backend, user should not keep references on state objects that they obtained from state backends + * outside the scope of the user function calls. + * <p> + * Some brief maintenance notes: + * <p> + * 1) Flattening the underlying data structure from a nested maps (namespace) -> (key) -> (state) to one flat map — End diff – Lose the `a` in `from a nested maps`, I think.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3483#discussion_r106137929

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java —
          @@ -0,0 +1,1021 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.state.heap;
          +
          +import org.apache.flink.annotation.VisibleForTesting;
          +import org.apache.flink.api.common.typeutils.TypeSerializer;
          +import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
          +import org.apache.flink.runtime.state.StateTransformationFunction;
          +import org.apache.flink.util.MathUtils;
          +import org.apache.flink.util.Preconditions;
          +import org.slf4j.Logger;
          +import org.slf4j.LoggerFactory;
          +
          +import java.util.Arrays;
          +import java.util.ConcurrentModificationException;
          +import java.util.Iterator;
          +import java.util.NoSuchElementException;
          +import java.util.TreeSet;
          +
          +/**
          + * Basis for Flink's in-memory state tables with copy-on-write support. This map does not support null values for
          — End diff –

          Is this the basis or the actual implementation? 😉

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3483#discussion_r106137929 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java — @@ -0,0 +1,1021 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.TreeSet; + +/** + * Basis for Flink's in-memory state tables with copy-on-write support. This map does not support null values for — End diff – Is this the basis or the actual implementation? 😉
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3483#discussion_r106138741

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotBackwardsCompatibilityTest.java —
          @@ -0,0 +1,107 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.state.heap;
          +
          +import org.apache.flink.api.common.ExecutionConfig;
          +import org.apache.flink.api.common.state.ListStateDescriptor;
          +import org.apache.flink.api.common.typeutils.base.IntSerializer;
          +import org.apache.flink.api.common.typeutils.base.StringSerializer;
          +import org.apache.flink.runtime.query.TaskKvStateRegistry;
          +import org.apache.flink.runtime.state.KeyGroupRange;
          +import org.apache.flink.runtime.state.KeyGroupsStateHandle;
          +import org.apache.flink.runtime.state.internal.InternalListState;
          +import org.apache.flink.util.InstantiationUtil;
          +import org.apache.flink.util.Preconditions;
          +import org.junit.Test;
          +
          +import java.io.BufferedInputStream;
          +import java.io.FileInputStream;
          +import java.net.URL;
          +import java.util.Collections;
          +
          +import static java.util.Arrays.asList;
          +import static org.junit.Assert.assertEquals;
          +import static org.mockito.Mockito.mock;
          +
          +public class HeapKeyedStateBackendSnapshotBackwardsCompatibilityTest {
          +
          + /**
          + * This test takes a snapshot that was created with Flink 1.2 and tries to restore it in Flink 1.3 to check
          + * the backwards compatibility of the serialization format of

          {@link StateTable}

          s.
          + */
          + @Test
          + public void testHeapKeyedStateBackend1_2To1_3BackwardsCompatibility() throws Exception {
          +
          + ClassLoader cl = getClass().getClassLoader();
          + URL resource = cl.getResource("heap_keyed_statebackend_1_2.snapshot");
          — End diff –

          Please also add the code (commented out) that is used to re-generate this snapshot.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3483#discussion_r106138741 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotBackwardsCompatibilityTest.java — @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; +import org.junit.Test; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.net.URL; +import java.util.Collections; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +public class HeapKeyedStateBackendSnapshotBackwardsCompatibilityTest { + + /** + * This test takes a snapshot that was created with Flink 1.2 and tries to restore it in Flink 1.3 to check + * the backwards compatibility of the serialization format of {@link StateTable} s. + */ + @Test + public void testHeapKeyedStateBackend1_2To1_3BackwardsCompatibility() throws Exception { + + ClassLoader cl = getClass().getClassLoader(); + URL resource = cl.getResource("heap_keyed_statebackend_1_2.snapshot"); — End diff – Please also add the code (commented out) that is used to re-generate this snapshot.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3483#discussion_r106137387

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotBackwardsCompatibilityTest.java —
          @@ -0,0 +1,107 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.state.heap;
          +
          +import org.apache.flink.api.common.ExecutionConfig;
          +import org.apache.flink.api.common.state.ListStateDescriptor;
          +import org.apache.flink.api.common.typeutils.base.IntSerializer;
          +import org.apache.flink.api.common.typeutils.base.StringSerializer;
          +import org.apache.flink.runtime.query.TaskKvStateRegistry;
          +import org.apache.flink.runtime.state.KeyGroupRange;
          +import org.apache.flink.runtime.state.KeyGroupsStateHandle;
          +import org.apache.flink.runtime.state.internal.InternalListState;
          +import org.apache.flink.util.InstantiationUtil;
          +import org.apache.flink.util.Preconditions;
          +import org.junit.Test;
          +
          +import java.io.BufferedInputStream;
          +import java.io.FileInputStream;
          +import java.net.URL;
          +import java.util.Collections;
          +
          +import static java.util.Arrays.asList;
          +import static org.junit.Assert.assertEquals;
          +import static org.mockito.Mockito.mock;
          +
          +public class HeapKeyedStateBackendSnapshotBackwardsCompatibilityTest {
          +
          + /**
          + * This test takes a snapshot that was created with Flink 1.2 and tries to restore it in Flink 1.3 to check
          + * the backwards compatibility of the serialization format of

          {@link StateTable}

          s.
          + */
          + @Test
          + public void testHeapKeyedStateBackend1_2To1_3BackwardsCompatibility() throws Exception {
          — End diff –

          I think this should be something like `testRestore12ToCurrent` or `testRestore12ToMaster` because that is in fact what it does. Otherwise we would have to hunt down all instances of 11To13 and rename them to 11To14 when we release.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3483#discussion_r106137387 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotBackwardsCompatibilityTest.java — @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; +import org.junit.Test; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.net.URL; +import java.util.Collections; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +public class HeapKeyedStateBackendSnapshotBackwardsCompatibilityTest { + + /** + * This test takes a snapshot that was created with Flink 1.2 and tries to restore it in Flink 1.3 to check + * the backwards compatibility of the serialization format of {@link StateTable} s. + */ + @Test + public void testHeapKeyedStateBackend1_2To1_3BackwardsCompatibility() throws Exception { — End diff – I think this should be something like `testRestore12ToCurrent` or `testRestore12ToMaster` because that is in fact what it does. Otherwise we would have to hunt down all instances of 11To13 and rename them to 11To14 when we release.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3483#discussion_r106132624

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java —
          @@ -0,0 +1,348 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.state.heap;
          +
          +import org.apache.flink.annotation.Internal;
          +import org.apache.flink.annotation.VisibleForTesting;
          +import org.apache.flink.api.common.typeutils.TypeSerializer;
          +import org.apache.flink.core.memory.DataOutputView;
          +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
          +import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
          +import org.apache.flink.runtime.state.StateTransformationFunction;
          +
          +import java.io.IOException;
          +import java.util.HashMap;
          +import java.util.Map;
          +
          +/**
          + * This implementation of

          {@link StateTable}

          is based on the Flink 1.2 implementation, using nested

          {@link HashMap}

          — End diff –

          We don't have to mention Flink 1.2 here, simply saying that it uses nested maps should be enough.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3483#discussion_r106132624 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java — @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; +import org.apache.flink.runtime.state.StateTransformationFunction; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * This implementation of {@link StateTable} is based on the Flink 1.2 implementation, using nested {@link HashMap} — End diff – We don't have to mention Flink 1.2 here, simply saying that it uses nested maps should be enough.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3483

          Thanks for the review, @aljoscha. I have addressed your comments. Merging this now.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3483 Thanks for the review, @aljoscha. I have addressed your comments. Merging this now.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3483

          Merged in ab014ef.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3483 Merged in ab014ef.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter closed the pull request at:

          https://github.com/apache/flink/pull/3483

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/3483
          Hide
          srichter Stefan Richter added a comment -

          fixed in ab014ef

          Show
          srichter Stefan Richter added a comment - fixed in ab014ef

            People

            • Assignee:
              srichter Stefan Richter
              Reporter:
              srichter Stefan Richter
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development