Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5715

Asynchronous snapshotting for HeapKeyedStateBackend

    Details

    • Type: New Feature
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0
    • Labels:
      None

      Description

      Blocking snapshots render the HeapKeyedStateBackend practically unusable for many user in productions. Their jobs can not tolerate stopped processing for the time it takes to write gigabytes of data from memory to disk. Asynchronous snapshots would be a solution to this problem. The challenge for the implementation is coming up with a copy-on-write scheme for the in-memory hash maps that build the foundation of this backend. After taking a closer look, this problem is twofold. First, providing CoW semantics for the hashmap itself, as a mutible structure, thereby avoiding costly locking or blocking where possible. Second, CoW for the mutable value objects, e.g. through cloning via serializers.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3466#discussion_r104690326

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java —
          @@ -68,62 +63,29 @@ public HeapReducingState(

          @Override
          public V get() {

          • Preconditions.checkState(currentNamespace != null, "No namespace set.");
          • Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
            -
          • Map<N, Map<K, V>> namespaceMap =
          • stateTable.get(backend.getCurrentKeyGroupIndex());
            -
          • if (namespaceMap == null) { - return null; - }
            -
            - Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
            -
            - if (keyedMap == null) { - return null; - }

            -

          • return keyedMap.get(backend.<K>getCurrentKey());
            + return stateTable.get(currentNamespace);
            }

          @Override
          public void add(V value) throws IOException {

          • Preconditions.checkState(currentNamespace != null, "No namespace set.");
          • Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
            + final N namespace = currentNamespace;

          if (value == null)

          { clear(); return; }
          • Map<N, Map<K, V>> namespaceMap =
          • stateTable.get(backend.getCurrentKeyGroupIndex());
            -
          • if (namespaceMap == null) { - namespaceMap = createNewMap(); - stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap); - }

            -

          • Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
            -
          • if (keyedMap == null) { - keyedMap = createNewMap(); - namespaceMap.put(currentNamespace, keyedMap); - }

            -

          • V currentValue = keyedMap.put(backend.<K>getCurrentKey(), value);
            + final StateTable<K, N, V> map = stateTable;
            + final V currentValue = map.putAndGetOld(namespace, value);
              • End diff –

          I think this leads to one more table lookup than the old version, right?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3466#discussion_r104690326 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java — @@ -68,62 +63,29 @@ public HeapReducingState( @Override public V get() { Preconditions.checkState(currentNamespace != null, "No namespace set."); Preconditions.checkState(backend.getCurrentKey() != null, "No key set."); - Map<N, Map<K, V>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex()); - if (namespaceMap == null) { - return null; - } - - Map<K, V> keyedMap = namespaceMap.get(currentNamespace); - - if (keyedMap == null) { - return null; - } - return keyedMap.get(backend.<K>getCurrentKey()); + return stateTable.get(currentNamespace); } @Override public void add(V value) throws IOException { Preconditions.checkState(currentNamespace != null, "No namespace set."); Preconditions.checkState(backend.getCurrentKey() != null, "No key set."); + final N namespace = currentNamespace; if (value == null) { clear(); return; } Map<N, Map<K, V>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex()); - if (namespaceMap == null) { - namespaceMap = createNewMap(); - stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap); - } - Map<K, V> keyedMap = namespaceMap.get(currentNamespace); - if (keyedMap == null) { - keyedMap = createNewMap(); - namespaceMap.put(currentNamespace, keyedMap); - } - V currentValue = keyedMap.put(backend.<K>getCurrentKey(), value); + final StateTable<K, N, V> map = stateTable; + final V currentValue = map.putAndGetOld(namespace, value); End diff – I think this leads to one more table lookup than the old version, right?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3466#discussion_r104675456

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java —
          @@ -0,0 +1,319 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.state.heap;
          +
          +import org.apache.flink.annotation.VisibleForTesting;
          +import org.apache.flink.api.common.typeutils.TypeSerializer;
          +import org.apache.flink.core.memory.DataOutputView;
          +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
          +import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
          +
          +import java.io.IOException;
          +import java.util.HashMap;
          +import java.util.Map;
          +
          +/**
          + * This implementation of

          {@link StateTable}

          is based on the Flink 1.2 implementation, using nested

          {@link HashMap}

          — End diff –

          I think it's not necessary to mention Flink 1.2. The description you gave should be enough.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3466#discussion_r104675456 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java — @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * This implementation of {@link StateTable} is based on the Flink 1.2 implementation, using nested {@link HashMap} — End diff – I think it's not necessary to mention Flink 1.2. The description you gave should be enough.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3466#discussion_r104671557

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java —
          @@ -53,11 +53,7 @@ public boolean isImmutableType() {

          @Override
          public ArrayList<T> copy(ArrayList<T> from) {

          • ArrayList<T> newList = new ArrayList<>(from.size());
          • for (int i = 0; i < from.size(); i++) { - newList.add(elementSerializer.copy(from.get(i))); - }
          • return newList;
            + return new ArrayList<>(from);
              • End diff –

          The old code was "slow" on purpose. 😉 (We need the deep copy here)

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3466#discussion_r104671557 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java — @@ -53,11 +53,7 @@ public boolean isImmutableType() { @Override public ArrayList<T> copy(ArrayList<T> from) { ArrayList<T> newList = new ArrayList<>(from.size()); for (int i = 0; i < from.size(); i++) { - newList.add(elementSerializer.copy(from.get(i))); - } return newList; + return new ArrayList<>(from); End diff – The old code was "slow" on purpose. 😉 (We need the deep copy here)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3466#discussion_r104710900

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java —
          @@ -53,11 +53,7 @@ public boolean isImmutableType() {

          @Override
          public ArrayList<T> copy(ArrayList<T> from) {

          • ArrayList<T> newList = new ArrayList<>(from.size());
          • for (int i = 0; i < from.size(); i++) { - newList.add(elementSerializer.copy(from.get(i))); - }
          • return newList;
            + return new ArrayList<>(from);
              • End diff –

          Yes, you are absolutely right. I somehow oversaw the inner call to copy . Will change this back.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3466#discussion_r104710900 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java — @@ -53,11 +53,7 @@ public boolean isImmutableType() { @Override public ArrayList<T> copy(ArrayList<T> from) { ArrayList<T> newList = new ArrayList<>(from.size()); for (int i = 0; i < from.size(); i++) { - newList.add(elementSerializer.copy(from.get(i))); - } return newList; + return new ArrayList<>(from); End diff – Yes, you are absolutely right. I somehow oversaw the inner call to copy . Will change this back.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3466#discussion_r105126769

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java —
          @@ -7,109 +7,167 @@

          • "License"); you may not use this file except in compliance
          • with the License. You may obtain a copy of the License at
            *
          • Unless required by applicable law or agreed to in writing, software
          • distributed under the License is distributed on an "AS IS" BASIS,
          • WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          • See the License for the specific language governing permissions and
          • limitations under the License.
            */
            +
            package org.apache.flink.runtime.state.heap;

          import org.apache.flink.annotation.VisibleForTesting;
          import org.apache.flink.api.common.typeutils.TypeSerializer;
          import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
          -import org.apache.flink.runtime.state.KeyGroupRange;
          -
          -import java.util.Map;
          -
          -public class StateTable<K, N, ST> {
          -

          • /** Map for holding the actual state objects. */
          • private final Map<N, Map<K, ST>>[] state;
            -
          • /** The offset to the contiguous key groups */
          • private final int keyGroupOffset;
            -
          • /** Combined meta information such as name and serializers for this state */
          • private RegisteredBackendStateMetaInfo<N, ST> metaInfo;
            -
          • // ------------------------------------------------------------------------
          • public StateTable(RegisteredBackendStateMetaInfo<N, ST> metaInfo, KeyGroupRange keyGroupRange) { - this.metaInfo = metaInfo; - this.keyGroupOffset = keyGroupRange.getStartKeyGroup(); - - @SuppressWarnings("unchecked") - Map<N, Map<K, ST>>[] state = (Map<N, Map<K, ST>>[]) new Map[keyGroupRange.getNumberOfKeyGroups()]; - this.state = state; - }

            -

          • // ------------------------------------------------------------------------
          • // access to maps
          • // ------------------------------------------------------------------------
            +import org.apache.flink.util.Preconditions;
          • public Map<N, Map<K, ST>>[] getState() { - return state; - }

            -

          • public Map<N, Map<K, ST>> get(int index) {
          • final int pos = indexToOffset(index);
          • if (pos >= 0 && pos < state.length) { - return state[pos]; - }

            else

            { - return null; - }
          • }
            -
          • public void set(int index, Map<N, Map<K, ST>> map) {
          • try { - state[indexToOffset(index)] = map; - }
          • catch (ArrayIndexOutOfBoundsException e) { - throw new IllegalArgumentException("Key group index out of range of key group range [" + - keyGroupOffset + ", " + (keyGroupOffset + state.length) + ")."); - }

            +/**
            + * Base class for state tables. Accesses to state are typically scoped by the currently active key, as provided
            + * through the

            {@link KeyContext}

            .
            + *
            + * @param <K> type of key
            + * @param <N> type of namespace
            + * @param <S> type of state
            + */
            +public abstract class StateTable<K, N, S> {
            +
            + /**
            + * The key context view on the backend. This provides information, such as the currently active key.
            + */
            + protected final KeyContext<K> keyContext;
            +
            + /**
            + * Combined meta information such as name and serializers for this state
            + */
            + protected RegisteredBackendStateMetaInfo<N, S> metaInfo;
            +
            + /**
            + *
            + * @param keyContext the key context provides the key scope for all put/get/delete operations.
            + * @param metaInfo the meta information, including the type serializer for state copy-on-write.
            + */
            + public StateTable(KeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo)

            { + this.keyContext = Preconditions.checkNotNull(keyContext); + this.metaInfo = Preconditions.checkNotNull(metaInfo); }
          • private int indexToOffset(int index) {
          • return index - keyGroupOffset;
            + // Main interface methods of StateTable -------------------------------------------------------
            +
            + /**
            + * Returns whether this {@link NestedMapsStateTable} is empty.
            + *
            + * @return {@code true} if this {@link NestedMapsStateTable}

            has no elements,

            {@code false}
            + * otherwise.
            + * @see #size()
            + */
            + public boolean isEmpty() { + return size() == 0; }

            - // ------------------------------------------------------------------------
            - // metadata
            - // ------------------------------------------------------------------------
            -
            - public TypeSerializer<ST> getStateSerializer() {
            + /**
            + * Returns the total number of entries in this {@link NestedMapsStateTable}. This is the sum of both sub-tables.
            + *
            + * @return the number of entries in this {@link NestedMapsStateTable}.
            + */
            + public abstract int size();
            +
            + /**
            + * Returns the value of the mapping for the composite of active key and given namespace.
            + *
            + * @param namespace the namespace. Not null.
            + * @return the value of the mapping with the specified key/namespace composite key, or {@code null}
            + * if no mapping for the specified key is found.
            + */
            + public abstract S get(Object namespace);
            +
            + /**
            + * Returns whether this table contains a mapping for the composite of active key and given namespace.
            + *
            + * @param namespace the namespace in the composite key to search for. Not null.
            + * @return {@code true} if this map contains the specified key/namespace composite key,
            + * {@code false}

            otherwise.
            + */
            + public abstract boolean containsKey(Object namespace);
            +
            + /**
            + * Maps the composite of active key and given namespace to the specified value. This method should be preferred
            + * over

            {@link #putAndGetOld(Object, Object)}

            (Object, Object)} when the caller is not interested in the old value.
            + *
            + * @param namespace the namespace. Not null.
            + * @param state the value. Can be null.
            + */
            + public abstract void put(N namespace, S state);
            +
            + /**
            + * Maps the composite of active key and given namespace to the specified value. Returns the previous state that
            + * was registered under the composite key.
            + *
            + * @param namespace the namespace. Not null.
            + * @param state the value. Can be null.
            + * @return the value of any previous mapping with the specified key or
            + *

            {@code null}

            if there was no such mapping.
            + */
            + public abstract S putAndGetOld(N namespace, S state);
            +
            + /**
            + * Removes the mapping for the composite of active key and given namespace. This method should be preferred
            + * over

            {@link #removeAndGetOld(Object)}

            when the caller is not interested in the old value.
            + *
            + * @param namespace the namespace of the mapping to remove. Not null.
            + */
            + public abstract void remove(Object namespace);

              • End diff –

          Sometimes, the namespace is typed to `N`, sometimes to `Object`. Is that an optimization or coincidence?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3466#discussion_r105126769 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java — @@ -7,109 +7,167 @@ "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + package org.apache.flink.runtime.state.heap; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; -import org.apache.flink.runtime.state.KeyGroupRange; - -import java.util.Map; - -public class StateTable<K, N, ST> { - /** Map for holding the actual state objects. */ private final Map<N, Map<K, ST>>[] state; - /** The offset to the contiguous key groups */ private final int keyGroupOffset; - /** Combined meta information such as name and serializers for this state */ private RegisteredBackendStateMetaInfo<N, ST> metaInfo; - // ------------------------------------------------------------------------ public StateTable(RegisteredBackendStateMetaInfo<N, ST> metaInfo, KeyGroupRange keyGroupRange) { - this.metaInfo = metaInfo; - this.keyGroupOffset = keyGroupRange.getStartKeyGroup(); - - @SuppressWarnings("unchecked") - Map<N, Map<K, ST>>[] state = (Map<N, Map<K, ST>>[]) new Map[keyGroupRange.getNumberOfKeyGroups()]; - this.state = state; - } - // ------------------------------------------------------------------------ // access to maps // ------------------------------------------------------------------------ +import org.apache.flink.util.Preconditions; public Map<N, Map<K, ST>>[] getState() { - return state; - } - public Map<N, Map<K, ST>> get(int index) { final int pos = indexToOffset(index); if (pos >= 0 && pos < state.length) { - return state[pos]; - } else { - return null; - } } - public void set(int index, Map<N, Map<K, ST>> map) { try { - state[indexToOffset(index)] = map; - } catch (ArrayIndexOutOfBoundsException e) { - throw new IllegalArgumentException("Key group index out of range of key group range [" + - keyGroupOffset + ", " + (keyGroupOffset + state.length) + ")."); - } +/** + * Base class for state tables. Accesses to state are typically scoped by the currently active key, as provided + * through the {@link KeyContext} . + * + * @param <K> type of key + * @param <N> type of namespace + * @param <S> type of state + */ +public abstract class StateTable<K, N, S> { + + /** + * The key context view on the backend. This provides information, such as the currently active key. + */ + protected final KeyContext<K> keyContext; + + /** + * Combined meta information such as name and serializers for this state + */ + protected RegisteredBackendStateMetaInfo<N, S> metaInfo; + + /** + * + * @param keyContext the key context provides the key scope for all put/get/delete operations. + * @param metaInfo the meta information, including the type serializer for state copy-on-write. + */ + public StateTable(KeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo) { + this.keyContext = Preconditions.checkNotNull(keyContext); + this.metaInfo = Preconditions.checkNotNull(metaInfo); } private int indexToOffset(int index) { return index - keyGroupOffset; + // Main interface methods of StateTable ------------------------------------------------------- + + /** + * Returns whether this {@link NestedMapsStateTable} is empty. + * + * @return {@code true} if this {@link NestedMapsStateTable} has no elements, {@code false} + * otherwise. + * @see #size() + */ + public boolean isEmpty() { + return size() == 0; } - // ------------------------------------------------------------------------ - // metadata - // ------------------------------------------------------------------------ - - public TypeSerializer<ST> getStateSerializer() { + /** + * Returns the total number of entries in this {@link NestedMapsStateTable}. This is the sum of both sub-tables. + * + * @return the number of entries in this {@link NestedMapsStateTable}. + */ + public abstract int size(); + + /** + * Returns the value of the mapping for the composite of active key and given namespace. + * + * @param namespace the namespace. Not null. + * @return the value of the mapping with the specified key/namespace composite key, or {@code null} + * if no mapping for the specified key is found. + */ + public abstract S get(Object namespace); + + /** + * Returns whether this table contains a mapping for the composite of active key and given namespace. + * + * @param namespace the namespace in the composite key to search for. Not null. + * @return {@code true} if this map contains the specified key/namespace composite key, + * {@code false} otherwise. + */ + public abstract boolean containsKey(Object namespace); + + /** + * Maps the composite of active key and given namespace to the specified value. This method should be preferred + * over {@link #putAndGetOld(Object, Object)} (Object, Object)} when the caller is not interested in the old value. + * + * @param namespace the namespace. Not null. + * @param state the value. Can be null. + */ + public abstract void put(N namespace, S state); + + /** + * Maps the composite of active key and given namespace to the specified value. Returns the previous state that + * was registered under the composite key. + * + * @param namespace the namespace. Not null. + * @param state the value. Can be null. + * @return the value of any previous mapping with the specified key or + * {@code null} if there was no such mapping. + */ + public abstract S putAndGetOld(N namespace, S state); + + /** + * Removes the mapping for the composite of active key and given namespace. This method should be preferred + * over {@link #removeAndGetOld(Object)} when the caller is not interested in the old value. + * + * @param namespace the namespace of the mapping to remove. Not null. + */ + public abstract void remove(Object namespace); End diff – Sometimes, the namespace is typed to `N`, sometimes to `Object`. Is that an optimization or coincidence?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3466#discussion_r105126046

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java —
          @@ -233,7 +231,8 @@ public void testMerging() throws Exception {
          StringSerializer.INSTANCE,
          HeapListStateTest.class.getClassLoader(),
          16,

          • new KeyGroupRange(0, 15));
            + new KeyGroupRange(0, 15),
            + (System.currentTimeMillis() & 1) == 1);
              • End diff –

          An `@Parameterized` test usually makes that quite easy...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3466#discussion_r105126046 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java — @@ -233,7 +231,8 @@ public void testMerging() throws Exception { StringSerializer.INSTANCE, HeapListStateTest.class.getClassLoader(), 16, new KeyGroupRange(0, 15)); + new KeyGroupRange(0, 15), + (System.currentTimeMillis() & 1) == 1); End diff – An `@Parameterized` test usually makes that quite easy...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3466#discussion_r105124813

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java —
          @@ -97,6 +100,27 @@ public FsStateBackend(String checkpointDataUri) throws IOException {
          *

          • @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
          • and the path to the checkpoint data directory.
            + * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
            + *
            + * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
            + */
            + public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
              • End diff –

          We are getting a bit of a constructor explosion here (also because there are even more options coming with a pending refactoring on my side). Can we make this flag configurable by a setter instead?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3466#discussion_r105124813 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java — @@ -97,6 +100,27 @@ public FsStateBackend(String checkpointDataUri) throws IOException { * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), and the path to the checkpoint data directory. + * @param asynchronousSnapshots Switch to enable asynchronous snapshots. + * + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) throws IOException { End diff – We are getting a bit of a constructor explosion here (also because there are even more options coming with a pending refactoring on my side). Can we make this flag configurable by a setter instead?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3466#discussion_r105125855

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java —
          @@ -233,7 +231,8 @@ public void testMerging() throws Exception {
          StringSerializer.INSTANCE,
          HeapListStateTest.class.getClassLoader(),
          16,

          • new KeyGroupRange(0, 15));
            + new KeyGroupRange(0, 15),
            + (System.currentTimeMillis() & 1) == 1);
              • End diff –

          Why not test both combinations here, rather than randomly one of the two?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3466#discussion_r105125855 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java — @@ -233,7 +231,8 @@ public void testMerging() throws Exception { StringSerializer.INSTANCE, HeapListStateTest.class.getClassLoader(), 16, new KeyGroupRange(0, 15)); + new KeyGroupRange(0, 15), + (System.currentTimeMillis() & 1) == 1); End diff – Why not test both combinations here, rather than randomly one of the two?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3466#discussion_r105127429

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java —
          @@ -68,62 +63,29 @@ public HeapReducingState(

          @Override
          public V get() {

          • Preconditions.checkState(currentNamespace != null, "No namespace set.");
          • Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
            -
          • Map<N, Map<K, V>> namespaceMap =
          • stateTable.get(backend.getCurrentKeyGroupIndex());
            -
          • if (namespaceMap == null) { - return null; - }
            -
            - Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
            -
            - if (keyedMap == null) { - return null; - }

            -

          • return keyedMap.get(backend.<K>getCurrentKey());
            + return stateTable.get(currentNamespace);
            }

          @Override
          public void add(V value) throws IOException {

          • Preconditions.checkState(currentNamespace != null, "No namespace set.");
          • Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
            + final N namespace = currentNamespace;

          if (value == null)

          { clear(); return; }
          • Map<N, Map<K, V>> namespaceMap =
          • stateTable.get(backend.getCurrentKeyGroupIndex());
            -
          • if (namespaceMap == null) { - namespaceMap = createNewMap(); - stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap); - }

            -

          • Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
            -
          • if (keyedMap == null) { - keyedMap = createNewMap(); - namespaceMap.put(currentNamespace, keyedMap); - }

            -

          • V currentValue = keyedMap.put(backend.<K>getCurrentKey(), value);
            + final StateTable<K, N, V> map = stateTable;
            + final V currentValue = map.putAndGetOld(namespace, value);
              • End diff –

          Why not push the Reduce into the table? `map.reduce(namespace, value, ReduceFunction)`? That gives the minimal number of navigations possible, similar to using `putIfAbsent()` rather then a `get()/put()`

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3466#discussion_r105127429 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java — @@ -68,62 +63,29 @@ public HeapReducingState( @Override public V get() { Preconditions.checkState(currentNamespace != null, "No namespace set."); Preconditions.checkState(backend.getCurrentKey() != null, "No key set."); - Map<N, Map<K, V>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex()); - if (namespaceMap == null) { - return null; - } - - Map<K, V> keyedMap = namespaceMap.get(currentNamespace); - - if (keyedMap == null) { - return null; - } - return keyedMap.get(backend.<K>getCurrentKey()); + return stateTable.get(currentNamespace); } @Override public void add(V value) throws IOException { Preconditions.checkState(currentNamespace != null, "No namespace set."); Preconditions.checkState(backend.getCurrentKey() != null, "No key set."); + final N namespace = currentNamespace; if (value == null) { clear(); return; } Map<N, Map<K, V>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex()); - if (namespaceMap == null) { - namespaceMap = createNewMap(); - stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap); - } - Map<K, V> keyedMap = namespaceMap.get(currentNamespace); - if (keyedMap == null) { - keyedMap = createNewMap(); - namespaceMap.put(currentNamespace, keyedMap); - } - V currentValue = keyedMap.put(backend.<K>getCurrentKey(), value); + final StateTable<K, N, V> map = stateTable; + final V currentValue = map.putAndGetOld(namespace, value); End diff – Why not push the Reduce into the table? `map.reduce(namespace, value, ReduceFunction)`? That gives the minimal number of navigations possible, similar to using `putIfAbsent()` rather then a `get()/put()`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3466

          I think this is all in all very good code!

          One thing I am worried about is the testing time now. The `EventTimeWindowCheckpointingITCase` tests already take super long, now we have two more.

          What we should probably do is make the following:

          • The data volume is very high in that test, and I think that was mainly done to stress RocksDB's async snapshots a bit.
          • The heaviness can be moved to a RocksDB specific async snapshot test (that does not need to use windows)
          • The base of the EventTimeWindowCheckpointingITCases can then be made much more lightweight.
          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3466 I think this is all in all very good code! One thing I am worried about is the testing time now. The `EventTimeWindowCheckpointingITCase` tests already take super long, now we have two more. What we should probably do is make the following: The data volume is very high in that test, and I think that was mainly done to stress RocksDB's async snapshots a bit. The heaviness can be moved to a RocksDB specific async snapshot test (that does not need to use windows) The base of the EventTimeWindowCheckpointingITCases can then be made much more lightweight.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3466#discussion_r105129267

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java —
          @@ -68,62 +63,29 @@ public HeapReducingState(

          @Override
          public V get() {

          • Preconditions.checkState(currentNamespace != null, "No namespace set.");
          • Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
            -
          • Map<N, Map<K, V>> namespaceMap =
          • stateTable.get(backend.getCurrentKeyGroupIndex());
            -
          • if (namespaceMap == null) { - return null; - }
            -
            - Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
            -
            - if (keyedMap == null) { - return null; - }

            -

          • return keyedMap.get(backend.<K>getCurrentKey());
            + return stateTable.get(currentNamespace);
            }

          @Override
          public void add(V value) throws IOException {

          • Preconditions.checkState(currentNamespace != null, "No namespace set.");
          • Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
            + final N namespace = currentNamespace;

          if (value == null)

          { clear(); return; }
          • Map<N, Map<K, V>> namespaceMap =
          • stateTable.get(backend.getCurrentKeyGroupIndex());
            -
          • if (namespaceMap == null) { - namespaceMap = createNewMap(); - stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap); - }

            -

          • Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
            -
          • if (keyedMap == null) { - keyedMap = createNewMap(); - namespaceMap.put(currentNamespace, keyedMap); - }

            -

          • V currentValue = keyedMap.put(backend.<K>getCurrentKey(), value);
            + final StateTable<K, N, V> map = stateTable;
            + final V currentValue = map.putAndGetOld(namespace, value);
              • End diff –

          I have already generalized and implemented the push-down as part of #3483 (avoiding too much rebasing). Would be nice if you could also take a look at that.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3466#discussion_r105129267 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java — @@ -68,62 +63,29 @@ public HeapReducingState( @Override public V get() { Preconditions.checkState(currentNamespace != null, "No namespace set."); Preconditions.checkState(backend.getCurrentKey() != null, "No key set."); - Map<N, Map<K, V>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex()); - if (namespaceMap == null) { - return null; - } - - Map<K, V> keyedMap = namespaceMap.get(currentNamespace); - - if (keyedMap == null) { - return null; - } - return keyedMap.get(backend.<K>getCurrentKey()); + return stateTable.get(currentNamespace); } @Override public void add(V value) throws IOException { Preconditions.checkState(currentNamespace != null, "No namespace set."); Preconditions.checkState(backend.getCurrentKey() != null, "No key set."); + final N namespace = currentNamespace; if (value == null) { clear(); return; } Map<N, Map<K, V>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex()); - if (namespaceMap == null) { - namespaceMap = createNewMap(); - stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap); - } - Map<K, V> keyedMap = namespaceMap.get(currentNamespace); - if (keyedMap == null) { - keyedMap = createNewMap(); - namespaceMap.put(currentNamespace, keyedMap); - } - V currentValue = keyedMap.put(backend.<K>getCurrentKey(), value); + final StateTable<K, N, V> map = stateTable; + final V currentValue = map.putAndGetOld(namespace, value); End diff – I have already generalized and implemented the push-down as part of #3483 (avoiding too much rebasing). Would be nice if you could also take a look at that.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3466#discussion_r105129552

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java —
          @@ -97,6 +100,27 @@ public FsStateBackend(String checkpointDataUri) throws IOException {
          *

          • @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
          • and the path to the checkpoint data directory.
            + * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
            + *
            + * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
            + */
            + public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
              • End diff –

          I agree that the explosion is not nice, and it is certainly easy to change that. I just have a preference to initialize the configuration as eager and `final` as possible

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3466#discussion_r105129552 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java — @@ -97,6 +100,27 @@ public FsStateBackend(String checkpointDataUri) throws IOException { * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), and the path to the checkpoint data directory. + * @param asynchronousSnapshots Switch to enable asynchronous snapshots. + * + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) throws IOException { End diff – I agree that the explosion is not nice, and it is certainly easy to change that. I just have a preference to initialize the configuration as eager and `final` as possible
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3466#discussion_r105131028

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java —
          @@ -233,7 +231,8 @@ public void testMerging() throws Exception {
          StringSerializer.INSTANCE,
          HeapListStateTest.class.getClassLoader(),
          16,

          • new KeyGroupRange(0, 15));
            + new KeyGroupRange(0, 15),
            + (System.currentTimeMillis() & 1) == 1);
              • End diff –

          Good idea.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3466#discussion_r105131028 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java — @@ -233,7 +231,8 @@ public void testMerging() throws Exception { StringSerializer.INSTANCE, HeapListStateTest.class.getClassLoader(), 16, new KeyGroupRange(0, 15)); + new KeyGroupRange(0, 15), + (System.currentTimeMillis() & 1) == 1); End diff – Good idea.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3466#discussion_r105389424

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java —
          @@ -97,6 +100,27 @@ public FsStateBackend(String checkpointDataUri) throws IOException {
          *

          • @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
          • and the path to the checkpoint data directory.
            + * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
            + *
            + * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
            + */
            + public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
              • End diff –

          We are getting one more parameter into the constructors with the change makes the state backend handle all checkpoint/savepoint storage related business. That must be constructor parameter, so if we can avoid further constructor parameters, that would help. Otherwise we really end up with 20 constructors.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3466#discussion_r105389424 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java — @@ -97,6 +100,27 @@ public FsStateBackend(String checkpointDataUri) throws IOException { * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), and the path to the checkpoint data directory. + * @param asynchronousSnapshots Switch to enable asynchronous snapshots. + * + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) throws IOException { End diff – We are getting one more parameter into the constructors with the change makes the state backend handle all checkpoint/savepoint storage related business. That must be constructor parameter, so if we can avoid further constructor parameters, that would help. Otherwise we really end up with 20 constructors.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3466#discussion_r105390074

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java —
          @@ -97,6 +100,27 @@ public FsStateBackend(String checkpointDataUri) throws IOException {
          *

          • @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
          • and the path to the checkpoint data directory.
            + * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
            + *
            + * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
            + */
            + public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
              • End diff –

          I guess this is then typically a case where you could go for a builder pattern, only problem i see here that this would be API breaking. What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3466#discussion_r105390074 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java — @@ -97,6 +100,27 @@ public FsStateBackend(String checkpointDataUri) throws IOException { * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), and the path to the checkpoint data directory. + * @param asynchronousSnapshots Switch to enable asynchronous snapshots. + * + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) throws IOException { End diff – I guess this is then typically a case where you could go for a builder pattern, only problem i see here that this would be API breaking. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3466#discussion_r106385143

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java —
          @@ -97,6 +100,27 @@ public FsStateBackend(String checkpointDataUri) throws IOException {
          *

          • @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
          • and the path to the checkpoint data directory.
            + * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
            + *
            + * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
            + */
            + public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
              • End diff –

          For now, I suggest to keep it as is and as soon as your PR goes in, we have a followup, that introduced a single entry point to a builder for all backend related configuration options that helps the user to see and navigate all options in a more centralized way, e.g. `KeyedBackends.heap().async(true).create()`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3466#discussion_r106385143 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java — @@ -97,6 +100,27 @@ public FsStateBackend(String checkpointDataUri) throws IOException { * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), and the path to the checkpoint data directory. + * @param asynchronousSnapshots Switch to enable asynchronous snapshots. + * + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) throws IOException { End diff – For now, I suggest to keep it as is and as soon as your PR goes in, we have a followup, that introduced a single entry point to a builder for all backend related configuration options that helps the user to see and navigate all options in a more centralized way, e.g. `KeyedBackends.heap().async(true).create()`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3466#discussion_r106399854

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java —
          @@ -97,6 +100,27 @@ public FsStateBackend(String checkpointDataUri) throws IOException {
          *

          • @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
          • and the path to the checkpoint data directory.
            + * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
            + *
            + * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
            + */
            + public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
              • End diff –

          sounds good!

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3466#discussion_r106399854 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java — @@ -97,6 +100,27 @@ public FsStateBackend(String checkpointDataUri) throws IOException { * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), and the path to the checkpoint data directory. + * @param asynchronousSnapshots Switch to enable asynchronous snapshots. + * + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) throws IOException { End diff – sounds good!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3466

          Merged in ab014ef.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3466 Merged in ab014ef.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter closed the pull request at:

          https://github.com/apache/flink/pull/3466

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/3466
          Hide
          srichter Stefan Richter added a comment -

          fixed in ab014ef

          Show
          srichter Stefan Richter added a comment - fixed in ab014ef
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StefanRRichter opened a pull request:

          https://github.com/apache/flink/pull/3602

          FLINK-5715 Asynchronous snapshots for heap keyed state backend (BACKPORT)

          Backport of PR #3466 from 1.3-snapshot to 1.2. Introduces asynchronous snapshots for heap keyed state backend.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StefanRRichter/flink async-backport

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3602.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3602


          commit c40a8d18a5b1c9f3ccd84701bbe1bd7d4a00924c
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-03-23T10:36:56Z

          FLINK-5715 Asynchronous snapshots for heap-based keyed state backend (backport from 1.3)


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/3602 FLINK-5715 Asynchronous snapshots for heap keyed state backend (BACKPORT) Backport of PR #3466 from 1.3-snapshot to 1.2. Introduces asynchronous snapshots for heap keyed state backend. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink async-backport Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3602.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3602 commit c40a8d18a5b1c9f3ccd84701bbe1bd7d4a00924c Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-03-23T10:36:56Z FLINK-5715 Asynchronous snapshots for heap-based keyed state backend (backport from 1.3)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3602

          CC @StephanEwen

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3602 CC @StephanEwen
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3602#discussion_r107931282

          — Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java —
          @@ -115,6 +117,14 @@ public static void stopTestCluster() {
          @Before
          public void initStateBackend() throws IOException {
          switch (stateBackendEnum) {
          + case MEM_ASYNC:
          + this.stateBackend = new AsyncMemoryStateBackend(MAX_MEM_STATE_SIZE);
          + break;
          + case FILE_ASYNC: {
          + String backups = tempFolder.newFolder().getAbsolutePath();
          + this.stateBackend = new AsyncFsStateBackend("file://" + backups);
          — End diff –

          To make this work cross platform, always use `file.toUri()` or `new Path(file.toUri())`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3602#discussion_r107931282 — Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java — @@ -115,6 +117,14 @@ public static void stopTestCluster() { @Before public void initStateBackend() throws IOException { switch (stateBackendEnum) { + case MEM_ASYNC: + this.stateBackend = new AsyncMemoryStateBackend(MAX_MEM_STATE_SIZE); + break; + case FILE_ASYNC: { + String backups = tempFolder.newFolder().getAbsolutePath(); + this.stateBackend = new AsyncFsStateBackend("file://" + backups); — End diff – To make this work cross platform, always use `file.toUri()` or `new Path(file.toUri())`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3602#discussion_r107935639

          — Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java —
          @@ -115,6 +117,14 @@ public static void stopTestCluster() {
          @Before
          public void initStateBackend() throws IOException {
          switch (stateBackendEnum) {
          + case MEM_ASYNC:
          + this.stateBackend = new AsyncMemoryStateBackend(MAX_MEM_STATE_SIZE);
          + break;
          + case FILE_ASYNC: {
          + String backups = tempFolder.newFolder().getAbsolutePath();
          + this.stateBackend = new AsyncFsStateBackend("file://" + backups);
          — End diff –

          That is true. This similar issue should have been there, because this is basically just copy-paste from the `case FILE`. I will fix both.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3602#discussion_r107935639 — Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java — @@ -115,6 +117,14 @@ public static void stopTestCluster() { @Before public void initStateBackend() throws IOException { switch (stateBackendEnum) { + case MEM_ASYNC: + this.stateBackend = new AsyncMemoryStateBackend(MAX_MEM_STATE_SIZE); + break; + case FILE_ASYNC: { + String backups = tempFolder.newFolder().getAbsolutePath(); + this.stateBackend = new AsyncFsStateBackend("file://" + backups); — End diff – That is true. This similar issue should have been there, because this is basically just copy-paste from the `case FILE`. I will fix both.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3602

          Thanks for the review, @StephanEwen ! A agree with undoing `MemoryStateBackend` and `MathUtil` as proposed, but would argue for keeping the change in `TimeWindow`. This change fixes skew problems with the old way the hash code was generated (being timestamps, typically only a small bandwith of bits in the hash code was used by the other approach). This skew can have particular impact in the new, flattened hashmap structure of the async backends.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3602 Thanks for the review, @StephanEwen ! A agree with undoing `MemoryStateBackend` and `MathUtil` as proposed, but would argue for keeping the change in `TimeWindow`. This change fixes skew problems with the old way the hash code was generated (being timestamps, typically only a small bandwith of bits in the hash code was used by the other approach). This skew can have particular impact in the new, flattened hashmap structure of the async backends.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3602

          After a discussion with @StephanEwen , we decided to follow my proposal. Merging this now.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3602 After a discussion with @StephanEwen , we decided to follow my proposal. Merging this now.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3602

          Merged in c6a80725053c49dd2064405577291bdc86c82003.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3602 Merged in c6a80725053c49dd2064405577291bdc86c82003.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter closed the pull request at:

          https://github.com/apache/flink/pull/3602

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/3602

            People

            • Assignee:
              srichter Stefan Richter
              Reporter:
              srichter Stefan Richter
            • Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development