Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6061

NPE on TypeSerializer.serialize with a RocksDBStateBackend calling entries() on a keyed state in the open() function

    Details

      Description

      With a default state (heap), the call to state.entries() "nicely fails" with a IllegalStateException :

      Caused by: java.lang.IllegalStateException: No key set.
      	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
      	at org.apache.flink.runtime.state.heap.HeapMapState.entries(HeapMapState.java:188)
      	at org.apache.flink.runtime.state.UserFacingMapState.entries(UserFacingMapState.java:77)
      	at org.apache.flink.Reproducer$FailingMapWithState.open(Reproducer.java:78)
      	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
      	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
      	at java.lang.Thread.run(Thread.java:745)
      

      With a RocksDBStateBackend, it fails with a NPE :

      Caused by: java.lang.NullPointerException
      	at org.apache.flink.api.common.typeutils.base.ShortSerializer.serialize(ShortSerializer.java:64)
      	at org.apache.flink.api.common.typeutils.base.ShortSerializer.serialize(ShortSerializer.java:27)
      	at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeKey(AbstractRocksDBState.java:181)
      	at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
      	at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:148)
      	at org.apache.flink.contrib.streaming.state.RocksDBMapState.serializeCurrentKeyAndNamespace(RocksDBMapState.java:263)
      	at org.apache.flink.contrib.streaming.state.RocksDBMapState.iterator(RocksDBMapState.java:196)
      	at org.apache.flink.contrib.streaming.state.RocksDBMapState.entries(RocksDBMapState.java:143)
      	at org.apache.flink.runtime.state.UserFacingMapState.entries(UserFacingMapState.java:77)
      	at org.apache.flink.Reproducer$FailingMapWithState.open(Reproducer.java:78)
      	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
      	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
      	at java.lang.Thread.run(Thread.java:745)
      

      The reason is that the record is null, because backend.getCurrentKey() is null (not yet set) in AbstractRocksDBState.
      This may also be the case for other RockDBState implementations.

      You can find the reproducer here based on 1.3-SNAPSHOT (needed for the MapState) :
      https://github.com/vpernin/flink-rocksdbstate-npe

      The reproducer is a non sense application. There is no MapState with TTL or expiration yet, so the goal is to try to shrink or expire the state at some interval.
      This could be done by iterating over the entries of the state and removing some of them.

      This could probably not be done in the open() method of a rich function.
      I also tried to implement CheckpointListener and to access the state content in notifyCheckpointComplete() method, but it fails to, I guess due to the asynchronous nature of the checkpoint.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                srichter Stefan Richter
                Reporter:
                vpernin Vladislav Pernin
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: