Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-12785

RocksDB savepoint recovery can use a lot of unmanaged memory

    XMLWordPrintableJSON

Details

    • Hide
      Before FLINK-12785, user may encounter OOM if there are huge KV pairs when restoring from savepoint of RocksDB state backend. In FLINK-12785 we introduce a size limit in RocksDBWriteBatchWrapper with default value 2MB, and RocksDB's WriteBatch will flush if the consumed memory exceeds it. User could tune the limit through the state.backend.rocksdb.write-batch-size property in flink-conf.yaml if needed.
      Show
      Before FLINK-12785 , user may encounter OOM if there are huge KV pairs when restoring from savepoint of RocksDB state backend. In FLINK-12785 we introduce a size limit in RocksDBWriteBatchWrapper with default value 2MB, and RocksDB's WriteBatch will flush if the consumed memory exceeds it. User could tune the limit through the state.backend.rocksdb.write-batch-size property in flink-conf.yaml if needed.

    Description

      I'm running an application that's backfilling data from Kafka. There's approximately 3 years worth of data, with a lot of watermark skew (i.e. new partitions were created over time) and I'm using daily windows. This makes a lot of the windows buffer their contents before the watermark catches up to "release" them. In turn, this gives me a lot of in-flight windows (200-300) with very large state keys in rocksdb (on the order of 40-50mb per key).

      Running the pipeline tends to be mostly fine - it's not terribly fast when appends happen but everything works. The problem comes when doing a savepoint restore - specifically, the taskmanagers eat ram until the kernel kills it due to being out of memory. The extra memory isn't JVM heap since the memory usage of the process is ~4x the -Xmx value and there aren't any OutOfMemoryError exceptions.

      I traced the culprit of the memory growth to RocksDBFullRestoreOperation.java#L212 . Specifically, while the keys/values are deserialized on the Java heap, RocksDBWriteBatchWrapper forwards it to RocksDB's WriteBatch which buffers in unmanaged memory. That's not in itself an issue, but RocksDBWriteBatchWrapper flushes only based on a number of records - not a number of bytes in-flight. Specifically, RocksDBWriteBatchWrapper will flush only once it has 500 records, and at 40mb per key, that's at least 20Gb of unmanaged memory before a flush.

      My suggestion would be to add an additional flush criteria to RocksDBWriteBatchWrapper - one based on batch.getDataSize() (e.g. 500 records or 5mb buffered). This way large key writes would be immediately flushed to RocksDB on recovery or even writes. I applied this approach and I was able to complete a savepoint restore for my job. That said, I'm not entirely sure what else this change would impact since I'm not very familiar with Flink.

      Attachments

        Issue Links

          Activity

            People

              klion26 Congxian Qiu
              mikekap Mike Kaplinskiy
              Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m