Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34631

Possible memory leak in pyflink when using state with RocksDB

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Not A Problem
    • 1.18.1
    • None
    • API / Python
    • None

    Description

      I have had issues with memory constantly growing in our production pipelines on pyflink task managers until they crash, which should not really happen when we use RocksDB as our state backend. 

      I've made a simple example to demonstrate the possible memory leak. In this example I update state with 1mb value for each key and then sleep for 1 second. Memory growth 1mb per second until the process crashes, as if the state value stays in memory. Same thing happens if I send 100 messages per second with 10kb each. I've also tested `MapState`, it's the same. 

      Either there is a memory leak, or my setup with default RocksDB configuration just doesn't fit the example. 

       

      ```python 
      import time

      import psutil

      from pyflink.common import Types
      from pyflink.datastream import (
          EmbeddedRocksDBStateBackend,
          KeyedProcessFunction,
          RuntimeContext,
          StreamExecutionEnvironment,
      )
      from pyflink.datastream.state import ValueStateDescriptor

      class Processor(KeyedProcessFunction):
          def open(self, runtime_context: RuntimeContext):
              self.state = runtime_context.get_state(
                  ValueStateDescriptor(
                      name="my_state",
                      value_type_info=Types.STRING(),
                  )
              )

          def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
              print("Processing", value, "Memory: ", round(psutil.Process().memory_info().rss / 1024 / 1024, 2), "MB")

              # Processing 1 Memory:  171.25 MB -> Processing 2 Memory:  172.12 MB -> ... grows 1mb per second, which should not happen because we use RocksDB as state backend
              self.state.update("a" * 1_000_000)  # 1 mb of data per second
              time.sleep(1.0)

      if {}name{} == "{}main{}":
          # - Create flink environment

          environment = StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)

          # - Make sure to use RocksDB as state backend

          environment.set_state_backend(EmbeddedRocksDBStateBackend())

          # - Create pipeline

          (
              environment.from_collection(
                  collection=list(range(3600 * 12)),
              )
              .key_by(lambda value: value)
              .process(Processor())
          )

          # - Execute pipeline

          environment.execute(job_name="memory_leak_test")

      ```

      Attachments

        Activity

          People

            Unassigned Unassigned
            marklidenberg Mark Lidenberg
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: