Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.1
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      The way we are defining our protocol is really a bit embarrassing. It is full of ad hoc serialization code for each API. This code is very fiddly and opaque and when it has errors they are hard to debug. Since it is all done one-off it is also very easy for it to become inconsistent. This was tolerable when there were only two apis with a few fields each, but now there are a half dozen more complex apis. By my count there is now over 1000 lines of code in kafka.apis.*.

      One option would be to use protocol buffers or thrift or another schema-oriented code gen RPC language. However I think this is probably the wrong direction for a couple reasons. One is that we want something that works well with our I/O model, both network and disk, which is very NIO-centric. So it should work directly with ByteBuffers. Second I feel that these systems complicate the specification of the protocol. They give a schema, which is a great high-level description, but the translation of that to bytes is essentially whatever their code-gen engine chooses to do. These things are a great way to build application services, but not great for something like what we are building.

      Instead I think we should do what we have done, specify the protocol as a wiki. However we should write a little helper code to make our lives easier.

      Here is my recommendation for how this code would work. We add two helper classes: Schema and Record.

      You define messages formats like this:
      import Types._
      val FetchRequestProtocol =
      Schema("ReplicaId"->int32,
      "MaxWaitTime"->int32,
      "MinBytes"->int32,
      Seq("TopicName"->utf8,
      Seq("Partition"->int32,
      "FetchOffset"->int64,
      "MaxBytes"->int32)))

      Note that this almost exactly matches the BNF for the fetch request:
      https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

      Once defined this schema can be used to parse messages:
      val record: Record = FetchRequestProtocol.readFrom(buffer)
      A record is just a wrapper around an array. The readFrom method parses out the fields specified in the schema and populates the array. Fields in the record can be accessed by name, e.g.
      record("ReplicaId")
      For common access this is probably good enough. However since the position is fixed, it is also possible to get the element by a Field object, which gets rid of the hashmap lookup and goes directly to the right slot. E.g.
      val ReplicaIdField = FetchRequestProtocol("ReplicaId") // do this as a global variable
      ...
      record(ReplicaIdField)
      This will be for cases where we are a bit performance conscious and don't want to do umpteen hashmap lookups to resolve string field names.

      Likewise the other direction, to write out a record:
      record.writeTo(buffer)
      and to get the size in bytes:
      record.size

      Implementing a single read, write, and size method with generic schemas will not only make the underlying protocol clearly defined but also ensure good error handling, error reporting, etc. It will be a bit slower, maybe not much because we can optimize this code.

      I do realize that this is essentially what Avro or Thrift or ProtocolBuffers do, but I think this is much simpler, and can be implemented in a few hundred lines of code with no dependencies. Furthermore it is a way to implement our protocol, not a way to define a protocol.

      In terms of how we use this, this is what I have in mind:

      I think we should split the apis into a generic and a specific portion. With the generic piece being the header shared by all requests and responses, and the specific portion being the bits for that message. I recommend we officially implement versioning by allowing multiple versions of the schemas and always looking up the right schema for the incoming and outgoing messages. I think we can keep the existing case classes, and just map the scala objects to and from the record instances in a wrapper layer prior to the existing KafkaApis. The KafkaApis.handle method would disappear and instead this wrapper would handle message deserialization and calling the right method with the right request object.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                jkreps Jay Kreps
                Reporter:
                jkreps Jay Kreps
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: