RocksDB is one of the main consumers of off-heap memory, which it uses for BlockCache, MemTables, Indices and Bloom Filters.
Since 1.10 (
FLINK-7289), it is possible to:
- share these objects among RocksDB instances of the same slot
- bound the total memory usage by all RocksDB instances of a TM
The memory is divided between the slots equally (unless using fine-grained resource control).
This is sub-optimal if some slots contain more memory intensive tasks than the others.
The proposal is to widen the scope of sharing memory to TM, so that it can be shared across all of its RocksDB instances.
That would reduce the overall memory consumption in exchange for resource isolation.
- introduce "state.backend.rocksdb.memory.fixed-per-tm" (memory size, no default)
- cluster-level (yaml only)
- used by a job only if neither 'state.backend.rocksdb.memory.fixed-per-slot' nor 'state.backend.rocksdb.memory.fixed-per-slot' are not used for the job
- use cluster-level or default configuration when creating TM-wise shared RocksDB objects, e.g. "state.backend.rocksdb.memory.managed", "state.backend.rocksdb.memory.write-buffer-ratio"
- doesn't affect Flink memory calculations; user needs to take it into account when planning capacity (similar to fixed-per-slot)
Jobs 1 and2 will use the same 1Gb of shared unmanaged memory and will compete with each other.
Their Python code (or other consumers) will be able to use up to ~100Mb per slot.
Jobs 3 and 4 are not affected as they specify using managed (3) or fixed-per-slot memory (4).
Python code (or other consumers) will be able to use up to ~100Mb per slot but will compete with RocksDB in job (3).
Introduce sharedResources to TaskManager.
Then, similarly to the current slot-wise sharing using MemoryManager:
- put/get OpaqueMemoryResource
- Creation of Cache object is done from the backend code on the first call
- Release it when the last backend that uses it is destroyed
So flink-runtime doesn't have to depend on state backend.
RocksDB state backend is already a part of the distribution.
However, if a job also includes it then classloader.resolve-order should be set to parent-first to prevent conflicts.
The cache object should be destroyed on TM termnation; job or task completion should NOT close it.
- One way to test that the same RocksDB cache is used is via RocksDB metrics.
- manual and unit tests
- classloader.resolve-order=child-first is not supported
- fine-grained-resource-management is not supported
- only RocksDB will be able to use TM-wise shared memory; other consumers may be adjusted later
- set total "fixed-per-slot" to a larger value, essentially overcommitting unmanaged memory - doesn't work well in containerized environments (OOMErrors)
- set numberOfTaskSlots=1 and allow sharing the same slot between any tasks - requires more invasive changes in scheduler and TM
- make part of managed memory shared; it is beleived that managed memory must preserve isolation proprty among other concerns