Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-12785

RocksDB savepoint recovery can use a lot of unmanaged memory

    XMLWordPrintableJSON

    Details

    • Release Note:
      Hide
      Before FLINK-12785, user may encounter OOM if there are huge KV pairs when restoring from savepoint of RocksDB state backend. In FLINK-12785 we introduce a size limit in RocksDBWriteBatchWrapper with default value 2MB, and RocksDB's WriteBatch will flush if the consumed memory exceeds it. User could tune the limit through the state.backend.rocksdb.write-batch-size property in flink-conf.yaml if needed.
      Show
      Before FLINK-12785 , user may encounter OOM if there are huge KV pairs when restoring from savepoint of RocksDB state backend. In FLINK-12785 we introduce a size limit in RocksDBWriteBatchWrapper with default value 2MB, and RocksDB's WriteBatch will flush if the consumed memory exceeds it. User could tune the limit through the state.backend.rocksdb.write-batch-size property in flink-conf.yaml if needed.

      Description

      I'm running an application that's backfilling data from Kafka. There's approximately 3 years worth of data, with a lot of watermark skew (i.e. new partitions were created over time) and I'm using daily windows. This makes a lot of the windows buffer their contents before the watermark catches up to "release" them. In turn, this gives me a lot of in-flight windows (200-300) with very large state keys in rocksdb (on the order of 40-50mb per key).

      Running the pipeline tends to be mostly fine - it's not terribly fast when appends happen but everything works. The problem comes when doing a savepoint restore - specifically, the taskmanagers eat ram until the kernel kills it due to being out of memory. The extra memory isn't JVM heap since the memory usage of the process is ~4x the -Xmx value and there aren't any OutOfMemoryError exceptions.

      I traced the culprit of the memory growth to RocksDBFullRestoreOperation.java#L212 . Specifically, while the keys/values are deserialized on the Java heap, RocksDBWriteBatchWrapper forwards it to RocksDB's WriteBatch which buffers in unmanaged memory. That's not in itself an issue, but RocksDBWriteBatchWrapper flushes only based on a number of records - not a number of bytes in-flight. Specifically, RocksDBWriteBatchWrapper will flush only once it has 500 records, and at 40mb per key, that's at least 20Gb of unmanaged memory before a flush.

      My suggestion would be to add an additional flush criteria to RocksDBWriteBatchWrapper - one based on batch.getDataSize() (e.g. 500 records or 5mb buffered). This way large key writes would be immediately flushed to RocksDB on recovery or even writes. I applied this approach and I was able to complete a savepoint restore for my job. That said, I'm not entirely sure what else this change would impact since I'm not very familiar with Flink.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                klion26 Congxian Qiu(klion26)
                Reporter:
                mikekap Mike Kaplinskiy
              • Votes:
                0 Vote for this issue
                Watchers:
                8 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m