Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Not A Problem
-
1.18.1
-
None
-
None
Description
I have had issues with memory constantly growing in our production pipelines on pyflink task managers until they crash, which should not really happen when we use RocksDB as our state backend.
I've made a simple example to demonstrate the possible memory leak. In this example I update state with 1mb value for each key and then sleep for 1 second. Memory growth 1mb per second until the process crashes, as if the state value stays in memory. Same thing happens if I send 100 messages per second with 10kb each. I've also tested `MapState`, it's the same.
Either there is a memory leak, or my setup with default RocksDB configuration just doesn't fit the example.
```python
import time
import psutil
from pyflink.common import Types
from pyflink.datastream import (
EmbeddedRocksDBStateBackend,
KeyedProcessFunction,
RuntimeContext,
StreamExecutionEnvironment,
)
from pyflink.datastream.state import ValueStateDescriptor
class Processor(KeyedProcessFunction):
def open(self, runtime_context: RuntimeContext):
self.state = runtime_context.get_state(
ValueStateDescriptor(
name="my_state",
value_type_info=Types.STRING(),
)
)
def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
print("Processing", value, "Memory: ", round(psutil.Process().memory_info().rss / 1024 / 1024, 2), "MB")
# Processing 1 Memory: 171.25 MB -> Processing 2 Memory: 172.12 MB -> ... grows 1mb per second, which should not happen because we use RocksDB as state backend
self.state.update("a" * 1_000_000) # 1 mb of data per second
time.sleep(1.0)
if {}name{} == "{}main{}":
# - Create flink environment
environment = StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
# - Make sure to use RocksDB as state backend
environment.set_state_backend(EmbeddedRocksDBStateBackend())
# - Create pipeline
(
environment.from_collection(
collection=list(range(3600 * 12)),
)
.key_by(lambda value: value)
.process(Processor())
)
# - Execute pipeline
environment.execute(job_name="memory_leak_test")
```