Details
-
New Feature
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
Description
Background and motivation
RocksDB is one of the main consumers of off-heap memory, which it uses for BlockCache, MemTables, Indices and Bloom Filters.
Since 1.10 (FLINK-7289), it is possible to:
- share these objects among RocksDB instances of the same slot
- bound the total memory usage by all RocksDB instances of a TM
The memory is divided between the slots equally (unless using fine-grained resource control).
This is sub-optimal if some slots contain more memory intensive tasks than the others.
The proposal is to widen the scope of sharing memory to TM, so that it can be shared across all of its RocksDB instances.
That would reduce the overall memory consumption in exchange for resource isolation.
Proposed changes
Configuration
- introduce "state.backend.rocksdb.memory.fixed-per-tm" (memory size, no default)
- cluster-level (yaml only)
- used by a job only if neither 'state.backend.rocksdb.memory.fixed-per-slot' nor 'state.backend.rocksdb.memory.fixed-per-slot' are not used for the job
- use cluster-level or default configuration when creating TM-wise shared RocksDB objects, e.g. "state.backend.rocksdb.memory.managed", "state.backend.rocksdb.memory.write-buffer-ratio"
- doesn't affect Flink memory calculations; user needs to take it into account when planning capacity (similar to fixed-per-slot)
Example
# cluster-level configuration taskmanager.memory.managed.size: 1gb state.backend.rocksdb.memory.fixed-per-tm: 1gb taskmanager.numberOfTaskSlots: 10 cluster.fine-grained-resource-management.enabled: false # job 1: state.backend.rocksdb.memory.managed: false # uses shared TM memory # job 2: state.backend.rocksdb.memory.managed: false # uses shared TM memory # job 3: state.backend.rocksdb.memory.managed: true # uses exclusive managed memory # job 4: state.backend.rocksdb.memory.managed: true # gets overriden below state.backend.rocksdb.memory.fixed-per-slot: 50M # uses exclusive unmanaged memory
Jobs 1 and2 will use the same 1Gb of shared unmanaged memory and will compete with each other.
Their Python code (or other consumers) will be able to use up to ~100Mb per slot.
Jobs 3 and 4 are not affected as they specify using managed (3) or fixed-per-slot memory (4).
Python code (or other consumers) will be able to use up to ~100Mb per slot but will compete with RocksDB in job (3).
Creating and sharing RocksDB objects
Introduce sharedResources to TaskManager.
Then, similarly to the current slot-wise sharing using MemoryManager:
- put/get OpaqueMemoryResource
- Creation of Cache object is done from the backend code on the first call
- Release it when the last backend that uses it is destroyed
So flink-runtime doesn't have to depend on state backend.
Class loading and resolution
RocksDB state backend is already a part of the distribution.
However, if a job also includes it then classloader.resolve-order should be set to parent-first to prevent conflicts.
Lifecycle
The cache object should be destroyed on TM termnation; job or task completion should NOT close it.
Testing
- One way to test that the same RocksDB cache is used is via RocksDB metrics.
ITCases parameterization- manual and unit tests
Limitations
- classloader.resolve-order=child-first is not supported
- fine-grained-resource-management is not supported
- only RocksDB will be able to use TM-wise shared memory; other consumers may be adjusted later
Rejected alternatives
- set total "fixed-per-slot" to a larger value, essentially overcommitting unmanaged memory - doesn't work well in containerized environments (OOMErrors)
- set numberOfTaskSlots=1 and allow sharing the same slot between any tasks - requires more invasive changes in scheduler and TM
- make part of managed memory shared; it is beleived that managed memory must preserve isolation proprty among other concerns
Attachments
Issue Links
- causes
-
FLINK-30275 TaskExecutorTest.testSharedResourcesLifecycle fails
- Reopened
- links to