Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13286

Revisit Streams State Store and Serde Implementation

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • streams
    • None

    Description

      Kafka Streams state store is built in hierarchical layers as metered -> cached -> logged -> [convert] -> raw stores (rocksDB, in-memory), and it leveraged on the builtin Serde libraries for serialize / deserialize. There are several inefficiencies in the current design:

      • The API only supports serde using byte arrays. This means we generate a lot of garbage and spend unnecessary time copying bytes, especially when working with windowed state stores that rely on composite keys. In many places in the code we have extract parts of the composite key to deserialize the either the timestamp or the message key from the state store key (e.g. the methods in WindowStoreUtils).
      • The serde operation could happen on multiple layers of the state store hierarchies, which means we need to extra byte array copies as we move along doing serdes. For example, we do serde in the metered layer, but then again in cached layer with cache functions, and also in logged stores for generated the key/value in bytes to send to Kafka.

      To improve on this, we can consider having support for serde into/from ByteBuffers would allow us to reuse the underlying bytearrays and just pass around slices of the underlying Buffers to avoid the unnecessary copying.

      1) More specifically, e.g. the serialize interface could be refactored to:

      ByteBuffer serialize(String topic, T data, ByteBuffer);
      

      Where the serialized bytes would be appended to the ByteBuffer. When a series of serialize functions are called along side the state store hierarchies, we then just need to make sure that what's should be appended first to the ByteBuffer would be serialized first. E.g. if the serialized bytes format of a WindowSchema is <timestamp, boolean, key>

      Then we would need to call the serialize as in:

      serialize(key, serialize(leftRightBoolean, serialize(timestamp, buffer))); 
      

      2) In addition, we can consider having a pool of ByteBuffers representing a set of byte arrays that can be re-used. This can be captured as an intelligent ByteBufferSupplier, which provides:

      ByteBuffer ByteBufferSupplier#allocate(long size)
      

      Its implementation can choose to either create new byte arrays, or re-use existing ones in the pool; the gottcha though is that we may usually not know the serialized byte length for raw keys (think: in practice the keys would be in json/avro etc), and hence would not know how to pass in size for serialization, and hence may need to be conservative, or trial and error etc.

      Of course callers then would be responsible for returning the used ByteBuffer back to the Supplier via

      ByteBufferSupplier#deallocate(ByteBuffer buffer)
      

      Some quick notes here regarding concurrency and sharing of the byte-buffer pools:

      • For pull query handling threads, if we do not do any deserialization then we would not need to access the ByteBufferSuppliers, hence there's no concurrent access.
      • For multiple streaming threads, my intention is to have each thread getting its own isolated byte-buffer pools to avoid any concurrency.

      3) With RocksDB's direct byte-buffer (KAFKA-9168) we can optionally also use the allocated ByteBuffer via RocksDB'a direct API directly so that using them for puts/gets would not go through JNI, hence is more efficient. The Supplier then would need to be careful to deallocate these direct byte-buffers since they would not be GC'ed by the JVM.

      There's a catch though with direct ByteBuffer that we'd need to be careful about: though direct byte buffer is also managed by JVM (and hence GC) like heap byte buffer, they are not managed in the same way as the latter [1][2] and seems to be more conservative. It was suggested that sometimes users need to manually deallocate them if GC did not work promptly. I think the most effective way is that we try very best to re-use allocated direct byte-buffer from native memory, that means we probably want to allocate conservatively large size (if we do not know the serialized length), so that they can be reused.

      [1] https://www.fusion-reactor.com/blog/evangelism/understanding-java-buffer-pool-memory-space/#:~:text=A%20direct%20buffer%20is%20a,allocateDirect()%20factory%20method
      [2] https://stackoverflow.com/questions/3496508/deallocating-direct-buffer-native-memory-in-java-for-jogl/26777380#26777380

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              guozhang Guozhang Wang
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated: