Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20646

ReduceTransformation does not work with RocksDBStateBackend

    XMLWordPrintableJSON

Details

    Description

      The intra-slot managed memory sharing (FLIP-141) requires transformations to properly declare their managed memory use cases.

      For RocksDB state backend, it requires all Transformation-s on a keyed stream (with non-nullĀ KeySelector) to callĀ Transformation#updateManagedMemoryStateBackendUseCase, which the newly introduced ReduceTransformation did not.

      As a result, Flink will not reserve managed memory for operators converted from ReduceTransformation (FLINK-19931), leading to the following failure when RocksDB state backend is used.

      16:58:49,373 WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - Exception while restoring keyed state backend for StreamGroupedReduceOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) from alternative (1/1), will retry while more alternatives are available.
      java.io.IOException: Failed to acquire shared cache resource for RocksDB
      	at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:264) ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
      	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535) ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
      	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94) ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299) ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
      	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
      	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:316) ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:155) ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
      	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-runtime_2.11-1.12.0.jar:1.12.0]
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-runtime_2.11-1.12.0.jar:1.12.0]
      	at java.lang.Thread.run(Thread.java:832) [?:?]
      Caused by: java.lang.IllegalArgumentException: The fraction of memory to allocate should not be 0. Please make sure that all types of managed memory consumers contained in the job are configured with a non-negative weight via `taskmanager.memory.managed.consumer-weights`.
      	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:164) ~[flink-core-1.12.0.jar:1.12.0]
      	at org.apache.flink.runtime.memory.MemoryManager.validateFraction(MemoryManager.java:631) ~[flink-runtime_2.11-1.12.0.jar:1.12.0]
      	at org.apache.flink.runtime.memory.MemoryManager.computeMemorySize(MemoryManager.java:612) ~[flink-runtime_2.11-1.12.0.jar:1.12.0]
      	at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:499) ~[flink-runtime_2.11-1.12.0.jar:1.12.0]
      	at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:260) ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
      	... 16 more
      

      The problem is reported on the user-zh mailing list. (In Chinese though.)
      http://apache-flink.147419.n8.nabble.com/flink-1-12-RocksDBStateBackend-td9504.html

      Attachments

        Issue Links

          Activity

            People

              aljoscha Aljoscha Krettek
              xtsong Xintong Song
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: