Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19911

add read buffer for input stream

    XMLWordPrintableJSON

Details

    Description

      Heap StateBackend needs to serialize each Java Object into the file system during snapshot. RocksDB StateBackend's RocksFullSnapshotStrategy needs to read kvs from RocksDB and write them to the file system in the snapshot.

      The above two cases involve a lot of small io, not large io, frequent small io is not friendly to disk. Therefore, the buffer is used in the checkpoint snapshot writing process of the file system. For details, refer to the buffer of FsCheckpointStreamFactory.FsCheckpointStateOutputStream.

      There will be many small IOs in the restore process, but restore does not have a buffer. So I added a buffer and tested it based on Flink job.

      Flink Job environment:

       Flink version: 1.10
       StateBackend : FsStateBackend 
       code: Flink SQL count(distinct userId)
       uv: 10 million
       State size: 200M
       TM total memory: 16G
       Parallelism: 1

      It takes 33.1s to restore without read buffer, and 12.8s to restore with read buffer.

      How to do it?

      Use FSDataBufferedInputStream to wrap fsDataInputStream in HeapRestoreOperation#restore,code:

      FSDataInputStream fsDataInputStream = keyGroupsStateHandle.openInputStream();
      FSDataInputStream bufferedInputStream = new FSDataBufferedInputStream(fsDataInputStream);
      

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              fanrui Rui Fan
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: