Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-29928

Allow sharing (RocksDB) memory between slots



    • Hide
      Added new config parameters:
      - state.backend.rocksdb.memory.fixed-per-tm
      Added new config parameters: - state.backend.rocksdb.memory.fixed-per-tm


      Background and motivation

      RocksDB is one of the main consumers of off-heap memory, which it uses for BlockCache, MemTables, Indices and Bloom Filters.
      Since 1.10 (FLINK-7289), it is possible to:

      • share these objects among RocksDB instances of the same slot
      • bound the total memory usage by all RocksDB instances of a TM

      The memory is divided between the slots equally (unless using fine-grained resource control).
      This is sub-optimal if some slots contain more memory intensive tasks than the others.

      The proposal is to widen the scope of sharing memory to TM, so that it can be shared across all of its RocksDB instances.
      That would reduce the overall memory consumption in exchange for resource isolation.

      Proposed changes


      • introduce "state.backend.rocksdb.memory.fixed-per-tm" (memory size, no default)
        • cluster-level (yaml only)
        • used by a job only if neither 'state.backend.rocksdb.memory.fixed-per-slot' nor 'state.backend.rocksdb.memory.fixed-per-slot' are not used for the job
        • use cluster-level or default configuration when creating TM-wise shared RocksDB objects, e.g.  "state.backend.rocksdb.memory.managed", "state.backend.rocksdb.memory.write-buffer-ratio"
        • doesn't affect Flink memory calculations; user needs to take it into account when planning capacity (similar to fixed-per-slot)


      # cluster-level configuration
      taskmanager.memory.managed.size: 1gb
      state.backend.rocksdb.memory.fixed-per-tm: 1gb
      taskmanager.numberOfTaskSlots: 10
      cluster.fine-grained-resource-management.enabled: false
      # job 1:
      state.backend.rocksdb.memory.managed: false # uses shared TM memory
      # job 2:
      state.backend.rocksdb.memory.managed: false # uses shared TM memory
      # job 3:
      state.backend.rocksdb.memory.managed: true # uses exclusive managed memory
      # job 4:
      state.backend.rocksdb.memory.managed: true # gets overriden below
      state.backend.rocksdb.memory.fixed-per-slot: 50M # uses exclusive unmanaged memory

      Jobs 1 and2 will use the same 1Gb of shared unmanaged memory and will compete with each other.
      Their Python code (or other consumers) will be able to use up to ~100Mb per slot.

      Jobs 3 and 4 are not affected as they specify using managed (3) or fixed-per-slot memory (4).
      Python code (or other consumers) will be able to use up to ~100Mb per slot but will compete with RocksDB in job (3).

      Creating and sharing RocksDB objects

      Introduce sharedResources to TaskManager.
      Then, similarly to the current slot-wise sharing using MemoryManager:

      • put/get OpaqueMemoryResource
      • Creation of Cache object is done from the backend code on the first call
      • Release it when the last backend that uses it is destroyed
        So flink-runtime doesn't have to depend on state backend.

      Class loading and resolution

      RocksDB state backend is already a part of the distribution.
      However, if a job also includes it then classloader.resolve-order should be set to parent-first to prevent conflicts.


      The cache object should be destroyed on TM termnation; job or task completion should NOT close it.


      • One way to test that the same RocksDB cache is used is via RocksDB metrics.
      • ITCases parameterization
      • manual and unit tests


      • classloader.resolve-order=child-first is not supported
      • fine-grained-resource-management is not supported
      • only RocksDB will be able to use TM-wise shared memory; other consumers may be adjusted later

      Rejected alternatives

      • set total "fixed-per-slot" to a larger value, essentially overcommitting unmanaged memory - doesn't work well in containerized environments (OOMErrors)
      • set numberOfTaskSlots=1 and allow sharing the same slot between any tasks - requires more invasive changes in scheduler and TM
      • make part of managed memory shared; it is beleived that managed memory must preserve isolation proprty among other concerns

      cc: yunta, ym, liyu


        Issue Links



              roman Roman Khachatryan
              roman Roman Khachatryan
              0 Vote for this issue
              3 Start watching this issue