Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14629

Performance improvement for Zstd compressed workload

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • core
    • None

    Description

      Motivation

      From a CPU flamegraph analysis for a compressed workload (openmessaging), we have observed that ValidateMessagesAndAssignOffsets method takes 75% of the total CPU/time taken by UnifiedLog.appendAsLeader(). The improvements suggested below will reduce CPU usage and increase throughput.

      Background

      A producer will append multiple records in a batch and the batch will be compressed. This compressed batch along with some headers will be sent to the server. On the server, it will perform a checksum for the batch to validate data integrity during network transfer. The batch payload is still in compressed form so far. Broker will now try to append this batch to the log. Before appending, broker will perform schema integrity validation on individual records such as record offsets are monotonically increasing etc. To perform these validations, server will have to decompress the batch.

      The schema validation of a batch on the server is done by decompressing and validating individual records. For each records, the validation needs to read all fields from the record except for key and value. [1]

      Performance requirements

      Pre-allocation of array should not add excessive overhead to batches with small records → For example allocating a 65KB array for a record of size 1KB is an overkill and negatively impacts performance for small size requests.

      Overhead of skipping bytes should be minimal → we don’t need to read key/value of a record which on average is the largest amount of data in a record. The implementation should efficiently skip key/value bytes

      Minimize the number of JNI calls → JNI calls are expensive and work best when you make fewer calls to decompress/compress the same amount of data.

      Minimize new byte array/buffer allocation → Ideally, the only array allocation that should happen would be the array used to store the result of decompression. Even this could be optimized by using buffers backed direct memory or re-using same buffers since we process one record at a time.

      Current implementation - decompression + zstd

      We allocated a 2KB array called skipArray to store decompressed data [2]. This array is re-used for the scope of a batch (i.e. across all records).

      We allocate a 16KB array to buffer the data between skipArray and underlying zstd-jni library calls [3]. The motivation of doing is to read at least 16KB of data at-a-time in one single call to the JNI layer. This array is re-used for the scope of a batch (i.e. across all records).

      We provide a BufferPool to zstd-jni. It uses this pool to create buffers for it’s own use, i.e. one allocation per batch and one allocation per skip call(). Note that this pool is not used to store the output of decompression. Currently, we use BufferPool which is scoped to a thread.

      Potential improvements

      1. Do not read the end of the batch since it contains the key/value for last record. Instead of “skipping” which would lead to decompression, we can simply not read it at all.
      2. Remove two layers of buffers (the 16KB one and 2KB one) and replace with a single buffer called decompressionBuffer. The time it takes to prepare a batch for decompression will be bounded by the allocation of largest buffer and hence, using only one large buffer (16KB) doesn’t cause any regression.
      3. Use BufferSupplier to allocate the intermediate decompressed buffer.
      4. Calculate the size of decompressed buffer dynamically at runtime. It could be based on recommendation provided by Zstd. Currently fixed at 16KB. Using the value that is recommended by Zstd saves a copy in native code. https://github.com/facebook/zstd/issues/340
      5. Provide a pool of direct buffers to zstd-jni for it’s internal usage. Direct buffers is an ideal use case for scenarios where data is transferred across JNI such as the case in (de) compression. The latest version of zstd-jni works with direct buffers.
      6. Read the network input into a direct buffer and pass that to zstd-jni for decompression. Store the output in a direct buffer as well.
      7. Use dictionary functionality of decompression. Train the dictionary for first few MBs and then use it.
      8. Use the skip functionality of zstd-jni and do not bring “skipped” data to Kafka layer, hence, we don’t need a buffer size to store skipped data in Kafka. This could be done by using DataInputStream and removing the intermediate buffer stream (16Kb one).

      Prototype implementation

      https://github.com/divijvaidya/kafka/commits/optimize-compression 

      JMH benchmark of prototype

      After implementation of suggestion#2 and suggestion#3, we observe 10-25% improvement in throughput over existing implementation for large message size and 0-2% improvement in throughout for small message sizes. Note that we expect this performance to be further improved in production because the thread scope cached memory pool will be re-used to a greater extent over there. For detailed results see attached benchmark.

      Reference

      [1] LogValidator.java
      [2] DefaultRecordBatch → skipArray
      [3] ZStdFactory

      Attachments

        1. benchmark-jira.xlsx
          17 kB
          Divij Vaidya

        Activity

          People

            divijvaidya Divij Vaidya
            divijvaidya Divij Vaidya
            Ismael Juma Ismael Juma
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: