Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6061

NPE on TypeSerializer.serialize with a RocksDBStateBackend calling entries() on a keyed state in the open() function

    Details

      Description

      With a default state (heap), the call to state.entries() "nicely fails" with a IllegalStateException :

      Caused by: java.lang.IllegalStateException: No key set.
      	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
      	at org.apache.flink.runtime.state.heap.HeapMapState.entries(HeapMapState.java:188)
      	at org.apache.flink.runtime.state.UserFacingMapState.entries(UserFacingMapState.java:77)
      	at org.apache.flink.Reproducer$FailingMapWithState.open(Reproducer.java:78)
      	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
      	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
      	at java.lang.Thread.run(Thread.java:745)
      

      With a RocksDBStateBackend, it fails with a NPE :

      Caused by: java.lang.NullPointerException
      	at org.apache.flink.api.common.typeutils.base.ShortSerializer.serialize(ShortSerializer.java:64)
      	at org.apache.flink.api.common.typeutils.base.ShortSerializer.serialize(ShortSerializer.java:27)
      	at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeKey(AbstractRocksDBState.java:181)
      	at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
      	at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:148)
      	at org.apache.flink.contrib.streaming.state.RocksDBMapState.serializeCurrentKeyAndNamespace(RocksDBMapState.java:263)
      	at org.apache.flink.contrib.streaming.state.RocksDBMapState.iterator(RocksDBMapState.java:196)
      	at org.apache.flink.contrib.streaming.state.RocksDBMapState.entries(RocksDBMapState.java:143)
      	at org.apache.flink.runtime.state.UserFacingMapState.entries(UserFacingMapState.java:77)
      	at org.apache.flink.Reproducer$FailingMapWithState.open(Reproducer.java:78)
      	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
      	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
      	at java.lang.Thread.run(Thread.java:745)
      

      The reason is that the record is null, because backend.getCurrentKey() is null (not yet set) in AbstractRocksDBState.
      This may also be the case for other RockDBState implementations.

      You can find the reproducer here based on 1.3-SNAPSHOT (needed for the MapState) :
      https://github.com/vpernin/flink-rocksdbstate-npe

      The reproducer is a non sense application. There is no MapState with TTL or expiration yet, so the goal is to try to shrink or expire the state at some interval.
      This could be done by iterating over the entries of the state and removing some of them.

      This could probably not be done in the open() method of a rich function.
      I also tried to implement CheckpointListener and to access the state content in notifyCheckpointComplete() method, but it fails to, I guess due to the asynchronous nature of the checkpoint.

        Issue Links

          Activity

          Hide
          srichter Stefan Richter added a comment - - edited

          Thanks for reporting this. I agree that the information from the `IllegalStateException` is helpful to the user. I will add this to the RocksDB states. However, the behaviour is correct: there is no key context during this initialization, only when processing an element, the key that is extracted from that element determines the key context of the backend.

          Show
          srichter Stefan Richter added a comment - - edited Thanks for reporting this. I agree that the information from the `IllegalStateException` is helpful to the user. I will add this to the RocksDB states. However, the behaviour is correct: there is no key context during this initialization, only when processing an element, the key that is extracted from that element determines the key context of the backend.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StefanRRichter opened a pull request:

          https://github.com/apache/flink/pull/3545

          FLINK-6061 Introduced IllegalStateException when operating RocksDB …

          …keyed state with no key set

          This PR fixes FLINK-6061. I throws an `IllegalStateException` when keyed state is accessed and no key is set in the backend.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StefanRRichter/flink FLINK-FLINK-6061

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3545.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3545


          commit 0bb7b30351ab48669332a5023f419bf651277e75
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-03-15T11:23:12Z

          FLINK-6061 Introduced IllegalStateException when operating RocksDB keyed state with no key set


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/3545 FLINK-6061 Introduced IllegalStateException when operating RocksDB … …keyed state with no key set This PR fixes FLINK-6061 . I throws an `IllegalStateException` when keyed state is accessed and no key is set in the backend. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink FLINK- FLINK-6061 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3545.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3545 commit 0bb7b30351ab48669332a5023f419bf651277e75 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-03-15T11:23:12Z FLINK-6061 Introduced IllegalStateException when operating RocksDB keyed state with no key set
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3545

          Good fix, +1 to merge

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3545 Good fix, +1 to merge
          Hide
          vpernin Vladislav Pernin added a comment -

          Impressed by the reactivity guys.

          Any idea to implement properly enough an expiring mechanism on a keyed state at some regular interval ?

          Show
          vpernin Vladislav Pernin added a comment - Impressed by the reactivity guys. Any idea to implement properly enough an expiring mechanism on a keyed state at some regular interval ?
          Hide
          srichter Stefan Richter added a comment -

          Vladislav Pernin afaik, there is not yet a native support for TTL state. However, I think there is a better approach to emulate this on a user level than what you are currently trying. You could take a look at the `ProcessingFunction`documentation. Pretty sure Aljoscha Krettek could provide you with some more details and hints.

          Best,
          Stefan

          Show
          srichter Stefan Richter added a comment - Vladislav Pernin afaik, there is not yet a native support for TTL state. However, I think there is a better approach to emulate this on a user level than what you are currently trying. You could take a look at the `ProcessingFunction`documentation. Pretty sure Aljoscha Krettek could provide you with some more details and hints. Best, Stefan
          Hide
          aljoscha Aljoscha Krettek added a comment -

          What Stefan Richter seems to be suggesting is to use a ProcessFunction to use timers to implement state TTL. That is, you would set a timer for current time + timeout and when the timer fires you would call clear() on the state. This works because timers are scoped to a key. That is, when you set a timer a key is active and when that timer fires the same key will be active again, i.e. you will have access to the same state.

          Show
          aljoscha Aljoscha Krettek added a comment - What Stefan Richter seems to be suggesting is to use a ProcessFunction to use timers to implement state TTL. That is, you would set a timer for current time + timeout and when the timer fires you would call clear() on the state. This works because timers are scoped to a key. That is, when you set a timer a key is active and when that timer fires the same key will be active again, i.e. you will have access to the same state.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3545

          Thank @StephanEwen ! Merging this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3545 Thank @StephanEwen ! Merging this.
          Hide
          vpernin Vladislav Pernin added a comment -

          Thanks for the hint. I'll try this.

          Show
          vpernin Vladislav Pernin added a comment - Thanks for the hint. I'll try this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3545

          Fixed in 0bdc8bf (master)

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3545 Fixed in 0bdc8bf (master)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter closed the pull request at:

          https://github.com/apache/flink/pull/3545

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/3545
          Hide
          srichter Stefan Richter added a comment -

          Fixed in 0bdc8bf (master)

          Show
          srichter Stefan Richter added a comment - Fixed in 0bdc8bf (master)

            People

            • Assignee:
              srichter Stefan Richter
              Reporter:
              vpernin Vladislav Pernin
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development