Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5530

race condition in AbstractRocksDBState#getSerializedValue

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.2.0
    • Fix Version/s: 1.2.0, 1.3.0
    • Component/s: Queryable State
    • Labels:
      None

      Description

      AbstractRocksDBState#getSerializedValue() uses the same key serialisation stream as the ordinary state access methods but is called in parallel during state queries thus violating the assumption of only one thread accessing it.

      This may lead to either wrong results in queries or corrupt data while queries are executed.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user NicoK opened a pull request:

          https://github.com/apache/flink/pull/3143

          FLINK-5530 fix race condition in AbstractRocksDBState#getSerializedValue

          `AbstractRocksDBState#getSerializedValue()` uses the same key serialisation
          stream as the ordinary state access methods but is called in parallel during
          state queries thus violating the assumption of only one thread accessing it.

          This may lead to either wrong results in queries or corrupt data while queries
          are executed.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/NicoK/flink flink-5530

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3143.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3143


          commit 7fa4c61a04cb96b94907e6ec3803b994d3c6643d
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-01-17T16:38:29Z

          FLINK-5530 fix race condition in AbstractRocksDBState#getSerializedValue

          AbstractRocksDBState#getSerializedValue() uses the same key serialisation
          stream as the ordinary state access methods but is called in parallel during
          state queries thus violating the assumption of only one thread accessing it.

          This may lead to either wrong results in queries or corrupt data while queries
          are executed.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/3143 FLINK-5530 fix race condition in AbstractRocksDBState#getSerializedValue `AbstractRocksDBState#getSerializedValue()` uses the same key serialisation stream as the ordinary state access methods but is called in parallel during state queries thus violating the assumption of only one thread accessing it. This may lead to either wrong results in queries or corrupt data while queries are executed. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-5530 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3143.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3143 commit 7fa4c61a04cb96b94907e6ec3803b994d3c6643d Author: Nico Kruber <nico@data-artisans.com> Date: 2017-01-17T16:38:29Z FLINK-5530 fix race condition in AbstractRocksDBState#getSerializedValue AbstractRocksDBState#getSerializedValue() uses the same key serialisation stream as the ordinary state access methods but is called in parallel during state queries thus violating the assumption of only one thread accessing it. This may lead to either wrong results in queries or corrupt data while queries are executed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3143#discussion_r96476317

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java —
          @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) {
          namespaceSerializer);

          int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups());

          • writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
          • return backend.db.get(columnFamily, keySerializationStream.toByteArray());
            +
            + // we cannot reuse the keySerializationStream member since this method
            + // is called concurrently to the other ones and it may this contain garbage
              • End diff –

          this -> thus?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r96476317 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java — @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may this contain garbage End diff – this -> thus?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3143#discussion_r96613631

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java —
          @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace)

          { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); - return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = + new ByteArrayOutputStreamWithPos(128); + DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = + new DataOutputViewStreamWrapper(tmpKeySerializationStream); + + writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, + tmpKeySerializationStream, tmpKeySerializationDateDataOutputView); + + return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray()); }

          protected void writeCurrentKeyWithGroupAndNamespace() throws IOException

          { - writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(), currentNamespace); + writeKeyWithGroupAndNamespace( + backend.getCurrentKeyGroupIndex(), + backend.getCurrentKey(), + currentNamespace, + this.keySerializationStream, + this.keySerializationDateDataOutputView); }
          • protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws IOException {
            + protected void writeKeyWithGroupAndNamespace(int keyGroup, K key,
            + N namespace,
            + final ByteArrayOutputStreamWithPos keySerializationStream,
            + final DataOutputView keySerializationDateDataOutputView) throws
            + IOException { + keySerializationStream.reset(); - writeKeyGroup(keyGroup); - writeKey(key); - writeNameSpace(namespace); + writeKeyGroup(keyGroup, keySerializationDateDataOutputView); + writeKey(key, keySerializationStream, keySerializationDateDataOutputView); + writeNameSpace(namespace, keySerializationStream, keySerializationDateDataOutputView); }
          • private void writeKeyGroup(int keyGroup) throws IOException {
            + private void writeKeyGroup(int keyGroup,
            + final DataOutputView keySerializationDateDataOutputView) throws
            + IOException
            Unknown macro: { + for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) { keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3)); } }
          • private void writeKey(K key) throws IOException {
            + private void writeKey(K key,
            + final ByteArrayOutputStreamWithPos keySerializationStream,
            + final DataOutputView keySerializationDateDataOutputView) throws
            + IOException {
            +
            //write key
            int beforeWrite = keySerializationStream.getPosition();
            backend.getKeySerializer().serialize(key, keySerializationDateDataOutputView);

          if (ambiguousKeyPossible)

          { //write size of key - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); }

          }

          • private void writeNameSpace(N namespace) throws IOException {
            + private void writeNameSpace(N namespace,
            + final ByteArrayOutputStreamWithPos keySerializationStream,
            + final DataOutputView keySerializationDateDataOutputView) throws
            + IOException {
            +
            int beforeWrite = keySerializationStream.getPosition();
            namespaceSerializer.serialize(namespace, keySerializationDateDataOutputView);

          if (ambiguousKeyPossible)

          { //write length of namespace - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); }

          }

          • private void writeLengthFrom(int fromPosition) throws IOException {
            + private static void writeLengthFrom(int fromPosition,
            + final ByteArrayOutputStreamWithPos keySerializationStream,
            + final DataOutputView keySerializationDateDataOutputView) throws
            + IOException {
              • End diff –

          I think other part of the Flink code do not break between `throws` and exceptions. Would be nice to keep the style similar across the code.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r96613631 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java — @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); - return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = + new ByteArrayOutputStreamWithPos(128); + DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = + new DataOutputViewStreamWrapper(tmpKeySerializationStream); + + writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, + tmpKeySerializationStream, tmpKeySerializationDateDataOutputView); + + return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray()); } protected void writeCurrentKeyWithGroupAndNamespace() throws IOException { - writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(), currentNamespace); + writeKeyWithGroupAndNamespace( + backend.getCurrentKeyGroupIndex(), + backend.getCurrentKey(), + currentNamespace, + this.keySerializationStream, + this.keySerializationDateDataOutputView); } protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws IOException { + protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, + N namespace, + final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws + IOException { + keySerializationStream.reset(); - writeKeyGroup(keyGroup); - writeKey(key); - writeNameSpace(namespace); + writeKeyGroup(keyGroup, keySerializationDateDataOutputView); + writeKey(key, keySerializationStream, keySerializationDateDataOutputView); + writeNameSpace(namespace, keySerializationStream, keySerializationDateDataOutputView); } private void writeKeyGroup(int keyGroup) throws IOException { + private void writeKeyGroup(int keyGroup, + final DataOutputView keySerializationDateDataOutputView) throws + IOException Unknown macro: { + for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) { keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3)); } } private void writeKey(K key) throws IOException { + private void writeKey(K key, + final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws + IOException { + //write key int beforeWrite = keySerializationStream.getPosition(); backend.getKeySerializer().serialize(key, keySerializationDateDataOutputView); if (ambiguousKeyPossible) { //write size of key - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); } } private void writeNameSpace(N namespace) throws IOException { + private void writeNameSpace(N namespace, + final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws + IOException { + int beforeWrite = keySerializationStream.getPosition(); namespaceSerializer.serialize(namespace, keySerializationDateDataOutputView); if (ambiguousKeyPossible) { //write length of namespace - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); } } private void writeLengthFrom(int fromPosition) throws IOException { + private static void writeLengthFrom(int fromPosition, + final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws + IOException { End diff – I think other part of the Flink code do not break between `throws` and exceptions. Would be nice to keep the style similar across the code.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3143#discussion_r96612773

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java —
          @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) {
          namespaceSerializer);

          int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups());

          • writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
          • return backend.db.get(columnFamily, keySerializationStream.toByteArray());
            +
            + // we cannot reuse the keySerializationStream member since this method
            + // is called concurrently to the other ones and it may thus contain garbage
            + ByteArrayOutputStreamWithPos tmpKeySerializationStream =
              • End diff –

          You don't need to wrap that heavily, most code uses 120 characters line lengths.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r96612773 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java — @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = End diff – You don't need to wrap that heavily, most code uses 120 characters line lengths.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3143#discussion_r96612430

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java —
          @@ -242,6 +245,132 @@ public void testValueState() throws Exception

          { backend.dispose(); }

          + /**
          + * Tests

          {@link ValueState#value()}

          and

          {@link KvState#getSerializedValue(byte[])}

          + * accessing the state concurrently. They should not get in the way of each
          + * other.
          + */
          + @Test
          + @SuppressWarnings("unchecked")
          + public void testValueStateRace() throws Exception {
          + final AbstractKeyedStateBackend<Integer> backend =
          + createKeyedBackend(IntSerializer.INSTANCE);
          + final Integer namespace = Integer.valueOf(1);
          +
          + final ValueStateDescriptor<String> kvId =
          + new ValueStateDescriptor<>("id", String.class);
          + kvId.initializeSerializerUnlessSet(new ExecutionConfig());
          +
          + final TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
          + final TypeSerializer<Integer> namespaceSerializer =
          + IntSerializer.INSTANCE;
          + final TypeSerializer<String> valueSerializer = kvId.getSerializer();
          +
          + final ValueState<String> state = backend
          + .getPartitionedState(namespace, IntSerializer.INSTANCE, kvId);
          +
          + @SuppressWarnings("unchecked")
          + final KvState<Integer> kvState = (KvState<Integer>) state;
          +
          + /**
          + * 1) Test that ValueState#value() before and after
          + * KvState#getSerializedValue(byte[]) return the same value.
          + */
          +
          + // set some key and namespace
          + final int key1 = 1;
          + backend.setCurrentKey(key1);
          + kvState.setCurrentNamespace(2);
          + state.update("2");
          + assertEquals("2", state.value());
          +
          + // query another key and namespace
          + assertNull(getSerializedValue(kvState, 3, keySerializer,
          + namespace, IntSerializer.INSTANCE,
          + valueSerializer));
          +
          + // the state should not have changed!
          + assertEquals("2", state.value());
          +
          + // re-set values
          + kvState.setCurrentNamespace(namespace);
          +
          + /**
          + * 2) Test two threads concurrently using ValueState#value() and
          + * KvState#getSerializedValue(byte[]).
          + */
          +
          + // some modifications to the state
          + final int key2 = 10;
          + backend.setCurrentKey(key2);
          + assertNull(state.value());
          + assertNull(getSerializedValue(kvState, key2, keySerializer,
          + namespace, namespaceSerializer, valueSerializer));
          + state.update("1");
          +
          + boolean getterSuccess;
          + final Throwable[] throwables =

          {null, null}

          ;
          +
          + final Thread getter = new Thread("State getter") {
          — End diff –

          How about using the `CheckedThread` to avoid the stuff with Throwable arrays, etc

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r96612430 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java — @@ -242,6 +245,132 @@ public void testValueState() throws Exception { backend.dispose(); } + /** + * Tests {@link ValueState#value()} and {@link KvState#getSerializedValue(byte[])} + * accessing the state concurrently. They should not get in the way of each + * other. + */ + @Test + @SuppressWarnings("unchecked") + public void testValueStateRace() throws Exception { + final AbstractKeyedStateBackend<Integer> backend = + createKeyedBackend(IntSerializer.INSTANCE); + final Integer namespace = Integer.valueOf(1); + + final ValueStateDescriptor<String> kvId = + new ValueStateDescriptor<>("id", String.class); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + final TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE; + final TypeSerializer<Integer> namespaceSerializer = + IntSerializer.INSTANCE; + final TypeSerializer<String> valueSerializer = kvId.getSerializer(); + + final ValueState<String> state = backend + .getPartitionedState(namespace, IntSerializer.INSTANCE, kvId); + + @SuppressWarnings("unchecked") + final KvState<Integer> kvState = (KvState<Integer>) state; + + /** + * 1) Test that ValueState#value() before and after + * KvState#getSerializedValue(byte[]) return the same value. + */ + + // set some key and namespace + final int key1 = 1; + backend.setCurrentKey(key1); + kvState.setCurrentNamespace(2); + state.update("2"); + assertEquals("2", state.value()); + + // query another key and namespace + assertNull(getSerializedValue(kvState, 3, keySerializer, + namespace, IntSerializer.INSTANCE, + valueSerializer)); + + // the state should not have changed! + assertEquals("2", state.value()); + + // re-set values + kvState.setCurrentNamespace(namespace); + + /** + * 2) Test two threads concurrently using ValueState#value() and + * KvState#getSerializedValue(byte[]). + */ + + // some modifications to the state + final int key2 = 10; + backend.setCurrentKey(key2); + assertNull(state.value()); + assertNull(getSerializedValue(kvState, key2, keySerializer, + namespace, namespaceSerializer, valueSerializer)); + state.update("1"); + + boolean getterSuccess; + final Throwable[] throwables = {null, null} ; + + final Thread getter = new Thread("State getter") { — End diff – How about using the `CheckedThread` to avoid the stuff with Throwable arrays, etc
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3143#discussion_r96613474

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java —
          @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace)

          { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); - return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = + new ByteArrayOutputStreamWithPos(128); + DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = + new DataOutputViewStreamWrapper(tmpKeySerializationStream); + + writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, + tmpKeySerializationStream, tmpKeySerializationDateDataOutputView); + + return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray()); }

          protected void writeCurrentKeyWithGroupAndNamespace() throws IOException

          { - writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(), currentNamespace); + writeKeyWithGroupAndNamespace( + backend.getCurrentKeyGroupIndex(), + backend.getCurrentKey(), + currentNamespace, + this.keySerializationStream, + this.keySerializationDateDataOutputView); }
          • protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws IOException {
            + protected void writeKeyWithGroupAndNamespace(int keyGroup, K key,
            + N namespace,
              • End diff –

          Just my personal opinion, but indenting parameters differently than the method body helps readability (say two indentations for the parameter list).

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r96613474 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java — @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); - return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = + new ByteArrayOutputStreamWithPos(128); + DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = + new DataOutputViewStreamWrapper(tmpKeySerializationStream); + + writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, + tmpKeySerializationStream, tmpKeySerializationDateDataOutputView); + + return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray()); } protected void writeCurrentKeyWithGroupAndNamespace() throws IOException { - writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(), currentNamespace); + writeKeyWithGroupAndNamespace( + backend.getCurrentKeyGroupIndex(), + backend.getCurrentKey(), + currentNamespace, + this.keySerializationStream, + this.keySerializationDateDataOutputView); } protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws IOException { + protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, + N namespace, End diff – Just my personal opinion, but indenting parameters differently than the method body helps readability (say two indentations for the parameter list).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3143#discussion_r96638451

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java —
          @@ -242,6 +245,132 @@ public void testValueState() throws Exception

          { backend.dispose(); }

          + /**
          + * Tests

          {@link ValueState#value()}

          and

          {@link KvState#getSerializedValue(byte[])}

          + * accessing the state concurrently. They should not get in the way of each
          + * other.
          + */
          + @Test
          + @SuppressWarnings("unchecked")
          + public void testValueStateRace() throws Exception {
          + final AbstractKeyedStateBackend<Integer> backend =
          + createKeyedBackend(IntSerializer.INSTANCE);
          + final Integer namespace = Integer.valueOf(1);
          +
          + final ValueStateDescriptor<String> kvId =
          + new ValueStateDescriptor<>("id", String.class);
          + kvId.initializeSerializerUnlessSet(new ExecutionConfig());
          +
          + final TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
          + final TypeSerializer<Integer> namespaceSerializer =
          + IntSerializer.INSTANCE;
          + final TypeSerializer<String> valueSerializer = kvId.getSerializer();
          +
          + final ValueState<String> state = backend
          + .getPartitionedState(namespace, IntSerializer.INSTANCE, kvId);
          +
          + @SuppressWarnings("unchecked")
          + final KvState<Integer> kvState = (KvState<Integer>) state;
          +
          + /**
          + * 1) Test that ValueState#value() before and after
          + * KvState#getSerializedValue(byte[]) return the same value.
          + */
          +
          + // set some key and namespace
          + final int key1 = 1;
          + backend.setCurrentKey(key1);
          + kvState.setCurrentNamespace(2);
          + state.update("2");
          + assertEquals("2", state.value());
          +
          + // query another key and namespace
          + assertNull(getSerializedValue(kvState, 3, keySerializer,
          + namespace, IntSerializer.INSTANCE,
          + valueSerializer));
          +
          + // the state should not have changed!
          + assertEquals("2", state.value());
          +
          + // re-set values
          + kvState.setCurrentNamespace(namespace);
          +
          + /**
          + * 2) Test two threads concurrently using ValueState#value() and
          + * KvState#getSerializedValue(byte[]).
          + */
          +
          + // some modifications to the state
          + final int key2 = 10;
          + backend.setCurrentKey(key2);
          + assertNull(state.value());
          + assertNull(getSerializedValue(kvState, key2, keySerializer,
          + namespace, namespaceSerializer, valueSerializer));
          + state.update("1");
          +
          + boolean getterSuccess;
          + final Throwable[] throwables =

          {null, null}

          ;
          +
          + final Thread getter = new Thread("State getter") {
          — End diff –

          nice replacement - didn't know about that class

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r96638451 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java — @@ -242,6 +245,132 @@ public void testValueState() throws Exception { backend.dispose(); } + /** + * Tests {@link ValueState#value()} and {@link KvState#getSerializedValue(byte[])} + * accessing the state concurrently. They should not get in the way of each + * other. + */ + @Test + @SuppressWarnings("unchecked") + public void testValueStateRace() throws Exception { + final AbstractKeyedStateBackend<Integer> backend = + createKeyedBackend(IntSerializer.INSTANCE); + final Integer namespace = Integer.valueOf(1); + + final ValueStateDescriptor<String> kvId = + new ValueStateDescriptor<>("id", String.class); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + final TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE; + final TypeSerializer<Integer> namespaceSerializer = + IntSerializer.INSTANCE; + final TypeSerializer<String> valueSerializer = kvId.getSerializer(); + + final ValueState<String> state = backend + .getPartitionedState(namespace, IntSerializer.INSTANCE, kvId); + + @SuppressWarnings("unchecked") + final KvState<Integer> kvState = (KvState<Integer>) state; + + /** + * 1) Test that ValueState#value() before and after + * KvState#getSerializedValue(byte[]) return the same value. + */ + + // set some key and namespace + final int key1 = 1; + backend.setCurrentKey(key1); + kvState.setCurrentNamespace(2); + state.update("2"); + assertEquals("2", state.value()); + + // query another key and namespace + assertNull(getSerializedValue(kvState, 3, keySerializer, + namespace, IntSerializer.INSTANCE, + valueSerializer)); + + // the state should not have changed! + assertEquals("2", state.value()); + + // re-set values + kvState.setCurrentNamespace(namespace); + + /** + * 2) Test two threads concurrently using ValueState#value() and + * KvState#getSerializedValue(byte[]). + */ + + // some modifications to the state + final int key2 = 10; + backend.setCurrentKey(key2); + assertNull(state.value()); + assertNull(getSerializedValue(kvState, key2, keySerializer, + namespace, namespaceSerializer, valueSerializer)); + state.update("1"); + + boolean getterSuccess; + final Throwable[] throwables = {null, null} ; + + final Thread getter = new Thread("State getter") { — End diff – nice replacement - didn't know about that class
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3143#discussion_r96638478

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java —
          @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) {
          namespaceSerializer);

          int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups());

          • writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
          • return backend.db.get(columnFamily, keySerializationStream.toByteArray());
            +
            + // we cannot reuse the keySerializationStream member since this method
            + // is called concurrently to the other ones and it may thus contain garbage
            + ByteArrayOutputStreamWithPos tmpKeySerializationStream =
              • End diff –

          yes, I saw very long lines here and there but imho 120 is quite long and affects readability as well - so does wrapping lines to some extend... I will go up to 90 instead

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r96638478 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java — @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = End diff – yes, I saw very long lines here and there but imho 120 is quite long and affects readability as well - so does wrapping lines to some extend... I will go up to 90 instead
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3143#discussion_r96638594

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java —
          @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace)

          { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); - return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = + new ByteArrayOutputStreamWithPos(128); + DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = + new DataOutputViewStreamWrapper(tmpKeySerializationStream); + + writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, + tmpKeySerializationStream, tmpKeySerializationDateDataOutputView); + + return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray()); }

          protected void writeCurrentKeyWithGroupAndNamespace() throws IOException

          { - writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(), currentNamespace); + writeKeyWithGroupAndNamespace( + backend.getCurrentKeyGroupIndex(), + backend.getCurrentKey(), + currentNamespace, + this.keySerializationStream, + this.keySerializationDateDataOutputView); }
          • protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws IOException {
            + protected void writeKeyWithGroupAndNamespace(int keyGroup, K key,
            + N namespace,
              • End diff –

          it certainly helps - didn't see how to configure my IntelliJ style to automatically do that though

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r96638594 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java — @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); - return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = + new ByteArrayOutputStreamWithPos(128); + DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = + new DataOutputViewStreamWrapper(tmpKeySerializationStream); + + writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, + tmpKeySerializationStream, tmpKeySerializationDateDataOutputView); + + return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray()); } protected void writeCurrentKeyWithGroupAndNamespace() throws IOException { - writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(), currentNamespace); + writeKeyWithGroupAndNamespace( + backend.getCurrentKeyGroupIndex(), + backend.getCurrentKey(), + currentNamespace, + this.keySerializationStream, + this.keySerializationDateDataOutputView); } protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws IOException { + protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, + N namespace, End diff – it certainly helps - didn't see how to configure my IntelliJ style to automatically do that though
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3143#discussion_r96638710

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java —
          @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace)

          { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); - return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = + new ByteArrayOutputStreamWithPos(128); + DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = + new DataOutputViewStreamWrapper(tmpKeySerializationStream); + + writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, + tmpKeySerializationStream, tmpKeySerializationDateDataOutputView); + + return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray()); }

          protected void writeCurrentKeyWithGroupAndNamespace() throws IOException

          { - writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(), currentNamespace); + writeKeyWithGroupAndNamespace( + backend.getCurrentKeyGroupIndex(), + backend.getCurrentKey(), + currentNamespace, + this.keySerializationStream, + this.keySerializationDateDataOutputView); }
          • protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws IOException {
            + protected void writeKeyWithGroupAndNamespace(int keyGroup, K key,
            + N namespace,
            + final ByteArrayOutputStreamWithPos keySerializationStream,
            + final DataOutputView keySerializationDateDataOutputView) throws
            + IOException { + keySerializationStream.reset(); - writeKeyGroup(keyGroup); - writeKey(key); - writeNameSpace(namespace); + writeKeyGroup(keyGroup, keySerializationDateDataOutputView); + writeKey(key, keySerializationStream, keySerializationDateDataOutputView); + writeNameSpace(namespace, keySerializationStream, keySerializationDateDataOutputView); }
          • private void writeKeyGroup(int keyGroup) throws IOException {
            + private void writeKeyGroup(int keyGroup,
            + final DataOutputView keySerializationDateDataOutputView) throws
            + IOException
            Unknown macro: { + for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) { keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3)); } }
          • private void writeKey(K key) throws IOException {
            + private void writeKey(K key,
            + final ByteArrayOutputStreamWithPos keySerializationStream,
            + final DataOutputView keySerializationDateDataOutputView) throws
            + IOException {
            +
            //write key
            int beforeWrite = keySerializationStream.getPosition();
            backend.getKeySerializer().serialize(key, keySerializationDateDataOutputView);

          if (ambiguousKeyPossible)

          { //write size of key - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); }

          }

          • private void writeNameSpace(N namespace) throws IOException {
            + private void writeNameSpace(N namespace,
            + final ByteArrayOutputStreamWithPos keySerializationStream,
            + final DataOutputView keySerializationDateDataOutputView) throws
            + IOException {
            +
            int beforeWrite = keySerializationStream.getPosition();
            namespaceSerializer.serialize(namespace, keySerializationDateDataOutputView);

          if (ambiguousKeyPossible)

          { //write length of namespace - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); }

          }

          • private void writeLengthFrom(int fromPosition) throws IOException {
            + private static void writeLengthFrom(int fromPosition,
            + final ByteArrayOutputStreamWithPos keySerializationStream,
            + final DataOutputView keySerializationDateDataOutputView) throws
            + IOException {
              • End diff –

          I'll try to do that manually, too, as I didn't see a way to teach my IntelliJ style that behaviour

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r96638710 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java — @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); - return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = + new ByteArrayOutputStreamWithPos(128); + DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = + new DataOutputViewStreamWrapper(tmpKeySerializationStream); + + writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, + tmpKeySerializationStream, tmpKeySerializationDateDataOutputView); + + return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray()); } protected void writeCurrentKeyWithGroupAndNamespace() throws IOException { - writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(), currentNamespace); + writeKeyWithGroupAndNamespace( + backend.getCurrentKeyGroupIndex(), + backend.getCurrentKey(), + currentNamespace, + this.keySerializationStream, + this.keySerializationDateDataOutputView); } protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws IOException { + protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, + N namespace, + final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws + IOException { + keySerializationStream.reset(); - writeKeyGroup(keyGroup); - writeKey(key); - writeNameSpace(namespace); + writeKeyGroup(keyGroup, keySerializationDateDataOutputView); + writeKey(key, keySerializationStream, keySerializationDateDataOutputView); + writeNameSpace(namespace, keySerializationStream, keySerializationDateDataOutputView); } private void writeKeyGroup(int keyGroup) throws IOException { + private void writeKeyGroup(int keyGroup, + final DataOutputView keySerializationDateDataOutputView) throws + IOException Unknown macro: { + for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) { keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3)); } } private void writeKey(K key) throws IOException { + private void writeKey(K key, + final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws + IOException { + //write key int beforeWrite = keySerializationStream.getPosition(); backend.getKeySerializer().serialize(key, keySerializationDateDataOutputView); if (ambiguousKeyPossible) { //write size of key - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); } } private void writeNameSpace(N namespace) throws IOException { + private void writeNameSpace(N namespace, + final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws + IOException { + int beforeWrite = keySerializationStream.getPosition(); namespaceSerializer.serialize(namespace, keySerializationDateDataOutputView); if (ambiguousKeyPossible) { //write length of namespace - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); } } private void writeLengthFrom(int fromPosition) throws IOException { + private static void writeLengthFrom(int fromPosition, + final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws + IOException { End diff – I'll try to do that manually, too, as I didn't see a way to teach my IntelliJ style that behaviour
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3143#discussion_r97049360

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java —
          @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) {
          namespaceSerializer);

          int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups());

          • writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
          • return backend.db.get(columnFamily, keySerializationStream.toByteArray());
            +
            + // we cannot reuse the keySerializationStream member since this method
            + // is called concurrently to the other ones and it may thus contain garbage
            + ByteArrayOutputStreamWithPos tmpKeySerializationStream =
              • End diff –

          Please try to use the same style as the remainder of the project. I know that every programmer in the world has figured out the perfect code style and is very opinionated on that

          Keeping a coherent style in a project is worth a lot and worth compromising...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r97049360 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java — @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = End diff – Please try to use the same style as the remainder of the project. I know that every programmer in the world has figured out the perfect code style and is very opinionated on that Keeping a coherent style in a project is worth a lot and worth compromising...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3143#discussion_r97059850

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java —
          @@ -132,55 +132,91 @@ public void setCurrentNamespace(N namespace)

          { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); - return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = + new ByteArrayOutputStreamWithPos(128); + DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = + new DataOutputViewStreamWrapper(tmpKeySerializationStream); + + writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, + tmpKeySerializationStream, tmpKeySerializationDateDataOutputView); + + return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray()); }

          protected void writeCurrentKeyWithGroupAndNamespace() throws IOException

          { - writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(), currentNamespace); + writeKeyWithGroupAndNamespace( + backend.getCurrentKeyGroupIndex(), + backend.getCurrentKey(), + currentNamespace, + this.keySerializationStream, + this.keySerializationDateDataOutputView); }
          • protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws IOException {
            + protected void writeKeyWithGroupAndNamespace(
            + int keyGroup, K key, N namespace,
            + final ByteArrayOutputStreamWithPos keySerializationStream,
            + final DataOutputView keySerializationDateDataOutputView) throws IOException { + keySerializationStream.reset(); - writeKeyGroup(keyGroup); - writeKey(key); - writeNameSpace(namespace); + writeKeyGroup(keyGroup, keySerializationDateDataOutputView); + writeKey(key, keySerializationStream, keySerializationDateDataOutputView); + writeNameSpace(namespace, keySerializationStream, keySerializationDateDataOutputView); }
          • private void writeKeyGroup(int keyGroup) throws IOException {
            + private void writeKeyGroup(
            + int keyGroup, final DataOutputView keySerializationDateDataOutputView)
            + throws IOException
            Unknown macro: { + for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) { keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3)); } }
          • private void writeKey(K key) throws IOException {
            + private void writeKey(
            + K key, final ByteArrayOutputStreamWithPos keySerializationStream,
            + final DataOutputView keySerializationDateDataOutputView) throws IOException {
            +
            //write key
            int beforeWrite = keySerializationStream.getPosition();
            backend.getKeySerializer().serialize(key, keySerializationDateDataOutputView);

          if (ambiguousKeyPossible)

          { //write size of key - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); }

          }

          • private void writeNameSpace(N namespace) throws IOException {
            + private void writeNameSpace(
            + N namespace, final ByteArrayOutputStreamWithPos keySerializationStream,
            + final DataOutputView keySerializationDateDataOutputView) throws IOException {
            +
            int beforeWrite = keySerializationStream.getPosition();
            namespaceSerializer.serialize(namespace, keySerializationDateDataOutputView);

          if (ambiguousKeyPossible)

          { //write length of namespace - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); }

          }

          • private void writeLengthFrom(int fromPosition) throws IOException {
            + private static void writeLengthFrom(
            + int fromPosition, final ByteArrayOutputStreamWithPos keySerializationStream,
            + final DataOutputView keySerializationDateDataOutputView) throws IOException { + int length = keySerializationStream.getPosition() - fromPosition; - writeVariableIntBytes(length); + writeVariableIntBytes(length, keySerializationDateDataOutputView); }
          • private void writeVariableIntBytes(int value) throws IOException {
            + private static void writeVariableIntBytes(
            + int value, final DataOutputView keySerializationDateDataOutputView)
              • End diff –

          Why are the output view arguments declared `final` here and in other methods? Usually we only mark method arguments as final if required for anonymous classes, etc.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r97059850 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java — @@ -132,55 +132,91 @@ public void setCurrentNamespace(N namespace) { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); - return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = + new ByteArrayOutputStreamWithPos(128); + DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = + new DataOutputViewStreamWrapper(tmpKeySerializationStream); + + writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, + tmpKeySerializationStream, tmpKeySerializationDateDataOutputView); + + return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray()); } protected void writeCurrentKeyWithGroupAndNamespace() throws IOException { - writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(), currentNamespace); + writeKeyWithGroupAndNamespace( + backend.getCurrentKeyGroupIndex(), + backend.getCurrentKey(), + currentNamespace, + this.keySerializationStream, + this.keySerializationDateDataOutputView); } protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws IOException { + protected void writeKeyWithGroupAndNamespace( + int keyGroup, K key, N namespace, + final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws IOException { + keySerializationStream.reset(); - writeKeyGroup(keyGroup); - writeKey(key); - writeNameSpace(namespace); + writeKeyGroup(keyGroup, keySerializationDateDataOutputView); + writeKey(key, keySerializationStream, keySerializationDateDataOutputView); + writeNameSpace(namespace, keySerializationStream, keySerializationDateDataOutputView); } private void writeKeyGroup(int keyGroup) throws IOException { + private void writeKeyGroup( + int keyGroup, final DataOutputView keySerializationDateDataOutputView) + throws IOException Unknown macro: { + for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) { keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3)); } } private void writeKey(K key) throws IOException { + private void writeKey( + K key, final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws IOException { + //write key int beforeWrite = keySerializationStream.getPosition(); backend.getKeySerializer().serialize(key, keySerializationDateDataOutputView); if (ambiguousKeyPossible) { //write size of key - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); } } private void writeNameSpace(N namespace) throws IOException { + private void writeNameSpace( + N namespace, final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws IOException { + int beforeWrite = keySerializationStream.getPosition(); namespaceSerializer.serialize(namespace, keySerializationDateDataOutputView); if (ambiguousKeyPossible) { //write length of namespace - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); } } private void writeLengthFrom(int fromPosition) throws IOException { + private static void writeLengthFrom( + int fromPosition, final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws IOException { + int length = keySerializationStream.getPosition() - fromPosition; - writeVariableIntBytes(length); + writeVariableIntBytes(length, keySerializationDateDataOutputView); } private void writeVariableIntBytes(int value) throws IOException { + private static void writeVariableIntBytes( + int value, final DataOutputView keySerializationDateDataOutputView) End diff – Why are the output view arguments declared `final` here and in other methods? Usually we only mark method arguments as final if required for anonymous classes, etc.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3143#discussion_r97065423

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java —
          @@ -132,55 +132,91 @@ public void setCurrentNamespace(N namespace)

          { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); - return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = + new ByteArrayOutputStreamWithPos(128); + DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = + new DataOutputViewStreamWrapper(tmpKeySerializationStream); + + writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, + tmpKeySerializationStream, tmpKeySerializationDateDataOutputView); + + return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray()); }

          protected void writeCurrentKeyWithGroupAndNamespace() throws IOException

          { - writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(), currentNamespace); + writeKeyWithGroupAndNamespace( + backend.getCurrentKeyGroupIndex(), + backend.getCurrentKey(), + currentNamespace, + this.keySerializationStream, + this.keySerializationDateDataOutputView); }
          • protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws IOException {
            + protected void writeKeyWithGroupAndNamespace(
            + int keyGroup, K key, N namespace,
            + final ByteArrayOutputStreamWithPos keySerializationStream,
            + final DataOutputView keySerializationDateDataOutputView) throws IOException { + keySerializationStream.reset(); - writeKeyGroup(keyGroup); - writeKey(key); - writeNameSpace(namespace); + writeKeyGroup(keyGroup, keySerializationDateDataOutputView); + writeKey(key, keySerializationStream, keySerializationDateDataOutputView); + writeNameSpace(namespace, keySerializationStream, keySerializationDateDataOutputView); }
          • private void writeKeyGroup(int keyGroup) throws IOException {
            + private void writeKeyGroup(
            + int keyGroup, final DataOutputView keySerializationDateDataOutputView)
            + throws IOException
            Unknown macro: { + for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) { keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3)); } }
          • private void writeKey(K key) throws IOException {
            + private void writeKey(
            + K key, final ByteArrayOutputStreamWithPos keySerializationStream,
            + final DataOutputView keySerializationDateDataOutputView) throws IOException {
            +
            //write key
            int beforeWrite = keySerializationStream.getPosition();
            backend.getKeySerializer().serialize(key, keySerializationDateDataOutputView);

          if (ambiguousKeyPossible)

          { //write size of key - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); }

          }

          • private void writeNameSpace(N namespace) throws IOException {
            + private void writeNameSpace(
            + N namespace, final ByteArrayOutputStreamWithPos keySerializationStream,
            + final DataOutputView keySerializationDateDataOutputView) throws IOException {
            +
            int beforeWrite = keySerializationStream.getPosition();
            namespaceSerializer.serialize(namespace, keySerializationDateDataOutputView);

          if (ambiguousKeyPossible)

          { //write length of namespace - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); }

          }

          • private void writeLengthFrom(int fromPosition) throws IOException {
            + private static void writeLengthFrom(
            + int fromPosition, final ByteArrayOutputStreamWithPos keySerializationStream,
            + final DataOutputView keySerializationDateDataOutputView) throws IOException { + int length = keySerializationStream.getPosition() - fromPosition; - writeVariableIntBytes(length); + writeVariableIntBytes(length, keySerializationDateDataOutputView); }
          • private void writeVariableIntBytes(int value) throws IOException {
            + private static void writeVariableIntBytes(
            + int value, final DataOutputView keySerializationDateDataOutputView)
              • End diff –

          Just a habit of mine so that I don't accidentally change a variable that is not supposed to in the current implementation of the method.
          I agree, however, that it is not very consistent among the parameters of each method, e.g. the `value` parameter in your example. I'll remove them.

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r97065423 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java — @@ -132,55 +132,91 @@ public void setCurrentNamespace(N namespace) { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); - return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = + new ByteArrayOutputStreamWithPos(128); + DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = + new DataOutputViewStreamWrapper(tmpKeySerializationStream); + + writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, + tmpKeySerializationStream, tmpKeySerializationDateDataOutputView); + + return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray()); } protected void writeCurrentKeyWithGroupAndNamespace() throws IOException { - writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(), currentNamespace); + writeKeyWithGroupAndNamespace( + backend.getCurrentKeyGroupIndex(), + backend.getCurrentKey(), + currentNamespace, + this.keySerializationStream, + this.keySerializationDateDataOutputView); } protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws IOException { + protected void writeKeyWithGroupAndNamespace( + int keyGroup, K key, N namespace, + final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws IOException { + keySerializationStream.reset(); - writeKeyGroup(keyGroup); - writeKey(key); - writeNameSpace(namespace); + writeKeyGroup(keyGroup, keySerializationDateDataOutputView); + writeKey(key, keySerializationStream, keySerializationDateDataOutputView); + writeNameSpace(namespace, keySerializationStream, keySerializationDateDataOutputView); } private void writeKeyGroup(int keyGroup) throws IOException { + private void writeKeyGroup( + int keyGroup, final DataOutputView keySerializationDateDataOutputView) + throws IOException Unknown macro: { + for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) { keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3)); } } private void writeKey(K key) throws IOException { + private void writeKey( + K key, final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws IOException { + //write key int beforeWrite = keySerializationStream.getPosition(); backend.getKeySerializer().serialize(key, keySerializationDateDataOutputView); if (ambiguousKeyPossible) { //write size of key - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); } } private void writeNameSpace(N namespace) throws IOException { + private void writeNameSpace( + N namespace, final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws IOException { + int beforeWrite = keySerializationStream.getPosition(); namespaceSerializer.serialize(namespace, keySerializationDateDataOutputView); if (ambiguousKeyPossible) { //write length of namespace - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); } } private void writeLengthFrom(int fromPosition) throws IOException { + private static void writeLengthFrom( + int fromPosition, final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws IOException { + int length = keySerializationStream.getPosition() - fromPosition; - writeVariableIntBytes(length); + writeVariableIntBytes(length, keySerializationDateDataOutputView); } private void writeVariableIntBytes(int value) throws IOException { + private static void writeVariableIntBytes( + int value, final DataOutputView keySerializationDateDataOutputView) End diff – Just a habit of mine so that I don't accidentally change a variable that is not supposed to in the current implementation of the method. I agree, however, that it is not very consistent among the parameters of each method, e.g. the `value` parameter in your example. I'll remove them.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3143

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3143
          Hide
          uce Ufuk Celebi added a comment -

          Fixed in d8222c1 (release-1.2), d16552d (master).

          Show
          uce Ufuk Celebi added a comment - Fixed in d8222c1 (release-1.2), d16552d (master).

            People

            • Assignee:
              NicoK Nico Kruber
              Reporter:
              NicoK Nico Kruber
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development