Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: core
    • Labels:
      None

      Description

      There are a couple of things in the protocol that are not idea. It would be good to tweak these for 0.8 so we start clean.

      Here is a set of problems and proposals:

      Problems:
      1. Correlation id is not used across all the requests. I don't think it can work as intended because of this.
      2. On reflection I am not sure that we need a correlation id field. I think that since we need to guarantee that processing is sequential on any particular socket we can correlate with a simple queue. (e.g. as the client sends messages it adds them to a queue and as it receives responses it just correlates to whatever is at the head of the queue).
      3. The metadata response seems to have a number of problems. Among them is that it weirdly repeats all the broker information many times. The response includes the ISR, leader (maybe), and the replicas. Each of these repeat all the broker information. This is super weird. I think what we should be doing here is including all broker information for all brokers and then just having the appropriate ids for the isr, leader, and replicas.
      4. For topic discovery I think we need to support the case where no topics are specified in the metadata request and for this return information about all topics. I don't think we do this now.
      5. I don't understand what the creator id is.
      6. The offset request and response is not fully thought through and should be generalized.

      Proposals:
      1, 2. Correlation id. This is not strictly speaking needed, but it is maybe useful for debugging to be able to trace a particular request from client to server. So we will extend this across all the requests.
      3. For metadata response I will try to fix this up by normalizing out the broker list and having the isr, replicas, and leader field just have the node id.
      4. This should be uncontroversial and easy to add.
      5. Let's remove creator id, it isn't used.
      6. Let's generalize offset request. My proposal is below:

      Rename TopicMetadata API to ClusterMetadata, as this will contain all the data that is known cluster-wide. Then let's generalize the offset request to be PartitionMetadata--namely stuff about a particular partition on a particular server.

      The format of PartitionMetdata would be the following:

      PartitionMetadataRequest => [TopicName [PartitionId MinSegmentTime MaxSegmentInfos]]
      TopicName => string
      PartitionId => uint32
      MinSegmentTime => uint64
      MaxSegmentInfos => int32

      PartitionMetadataResponse => [TopicName [PartitionMetadata]]
      TopicName => string
      PartitionMetadata => PartitionId LogSize NumberOfSegments LogEndOffset HighwaterMark [SegmentData]
      SegmentData => StartOffset LastModifiedTime
      LogSize => uint64
      NumberOfSegments => int32
      LogEndOffset => int64
      HighwaterMark => int64

      This would be general enough that we could continue to add to it for any new pieces of data we need.

      1. KAFKA-642-v1.patch
        35 kB
        Jay Kreps
      2. KAFKA-642-v2.patch
        58 kB
        Jay Kreps
      3. KAFKA-642-v3.patch
        58 kB
        Jay Kreps
      4. KAFKA-642-v4.patch
        65 kB
        Jay Kreps
      5. KAFKA-642-v6.patch
        60 kB
        Jay Kreps
      6. KAFKA-642-remove-response-versions.patch
        26 kB
        Jay Kreps

        Issue Links

          Activity

          Hide
          Jay Kreps added a comment -

          It also baselines all versions, including the message magic byte back to 0 for consistency.

          Show
          Jay Kreps added a comment - It also baselines all versions, including the message magic byte back to 0 for consistency.
          Hide
          Jay Kreps added a comment -

          This patch removes the response version. There was a discussion on this on the mailing list. This is fairly straight-forward patch so I am going to go ahead and check it in, but throwing it up here for review.

          Show
          Jay Kreps added a comment - This patch removes the response version. There was a discussion on this on the mailing list. This is fairly straight-forward patch so I am going to go ahead and check it in, but throwing it up here for review.
          Hide
          Jay Kreps added a comment -

          Ack, you're right.

          Show
          Jay Kreps added a comment - Ack, you're right.
          Hide
          Jun Rao added a comment -

          60. What you said makes sense. But from IDE, that call maps to the following api
          def map[B, That](f: (A) ⇒ B)(implicit bf: CanBuildFrom[Repr, B, That]): That
          instead of
          def map[B](f: (A) ⇒ B): Traversable[B]
          So, it seems that some soft of implicit conversion is happening. We can keep the code as it as, as long as we understand how this conversion works.

          Show
          Jun Rao added a comment - 60. What you said makes sense. But from IDE, that call maps to the following api def map [B, That] (f: (A) ⇒ B)(implicit bf: CanBuildFrom [Repr, B, That] ): That instead of def map [B] (f: (A) ⇒ B): Traversable [B] So, it seems that some soft of implicit conversion is happening. We can keep the code as it as, as long as we understand how this conversion works.
          Hide
          Jay Kreps added a comment -

          60. This isn't an implict conversion. In java the method to get something from a map would be myMap.get(key). In scala the same is myMap(key) because is uses the apply() method. So ids.map(brokers) means the same as ids.map(brokers.get) would in java (if java had a map() method). I think this is pretty natural and not magical, but I share your distrust of magic and am opening to changing it if you object.
          61. Yes, I think that all operators have lower precedence then method calls. This works for the same reason a.toInt + b.toInt works. But I agree parens make it explicit, I will add them.
          62. This is a very good catch, I had meant to propose that and I just forgot. I will do it.

          Show
          Jay Kreps added a comment - 60. This isn't an implict conversion. In java the method to get something from a map would be myMap.get(key). In scala the same is myMap(key) because is uses the apply() method. So ids.map(brokers) means the same as ids.map(brokers.get) would in java (if java had a map() method). I think this is pretty natural and not magical, but I share your distrust of magic and am opening to changing it if you object. 61. Yes, I think that all operators have lower precedence then method calls. This works for the same reason a.toInt + b.toInt works. But I agree parens make it explicit, I will add them. 62. This is a very good catch, I had meant to propose that and I just forgot. I will do it.
          Hide
          Jun Rao added a comment -

          Another thing.

          62. FetchResponsePartitionData.initialOffset is redundant (since it's specified in FetchRequest) and is not really used. I am wondering if we should get rid of it from the wire protocol.

          Show
          Jun Rao added a comment - Another thing. 62. FetchResponsePartitionData.initialOffset is redundant (since it's specified in FetchRequest) and is not really used. I am wondering if we should get rid of it from the wire protocol.
          Hide
          Jun Rao added a comment -

          Thanks for the patch. A couple of minor comments:

          60. TopicMetadata: The following statement works as magic. It relies on an implicit conversion from int to Broker based on the map (brokers).
          val replicas = replicaIds.map(brokers)
          Is this the recommended scala way? Is it more easily understood if we write it in the following way?
          val replicas = replicaIds.map(id => brokers(id))

          61. TopicMetadataResponse: In the following statement, do we know how the precedence of ++ over . work? Should we add brackets to make it clearer?
          val brokers = parts.flatMap(.replicas) ++ parts.map(.leader).collect

          {case Some(l) => l}
          Show
          Jun Rao added a comment - Thanks for the patch. A couple of minor comments: 60. TopicMetadata: The following statement works as magic. It relies on an implicit conversion from int to Broker based on the map (brokers). val replicas = replicaIds.map(brokers) Is this the recommended scala way? Is it more easily understood if we write it in the following way? val replicas = replicaIds.map(id => brokers(id)) 61. TopicMetadataResponse: In the following statement, do we know how the precedence of ++ over . work? Should we add brackets to make it clearer? val brokers = parts.flatMap( .replicas) ++ parts.map( .leader).collect {case Some(l) => l}
          Hide
          Jay Kreps added a comment -

          Neha:
          12. No, that should be fine, right? The alternative is actually trying to make it work properly everywhere but that seems pointless since it involves grandfathering up through lots of oddball apis like the two random variants on earliestOrLatestOffset()...I think we should just clean all that up later rather than half-ass it now.
          13. Oooh, very nice catch. Would have been super awesome to be debugging that the first time someone plugged in a non-ascii client id.

          Joel:
          Ach, thanks for reminding me. Will file a ticket.

          Checking in with a fix for 13.

          Show
          Jay Kreps added a comment - Neha: 12. No, that should be fine, right? The alternative is actually trying to make it work properly everywhere but that seems pointless since it involves grandfathering up through lots of oddball apis like the two random variants on earliestOrLatestOffset()...I think we should just clean all that up later rather than half-ass it now. 13. Oooh, very nice catch. Would have been super awesome to be debugging that the first time someone plugged in a non-ascii client id. Joel: Ach, thanks for reminding me. Will file a ticket. Checking in with a fix for 13.
          Hide
          Joel Koshy added a comment -

          +1

          BTW, is there a separate jira open on fetching metadata for all topics if none are given to the metadata request?

          Show
          Joel Koshy added a comment - +1 BTW, is there a separate jira open on fetching metadata for all topics if none are given to the metadata request?
          Hide
          Neha Narkhede added a comment -

          +1. Minor observations -

          12. In OffsetRequest, I thought you didn't want to default correlationId to 0, no ?
          13. In TopicMetadataRequest.sizeInBytes, there is a bug in the way we compute the length for clientId. I believe it should use shortStringLength(client) instead of clientId.length

          Show
          Neha Narkhede added a comment - +1. Minor observations - 12. In OffsetRequest, I thought you didn't want to default correlationId to 0, no ? 13. In TopicMetadataRequest.sizeInBytes, there is a bug in the way we compute the length for clientId. I believe it should use shortStringLength(client) instead of clientId.length
          Hide
          Jay Kreps added a comment -

          Patch v6.

          • Rebased to 0.8 head.
          • Removed changes to ClientUtils since they conflict and aren't really related to protocol changes.
          Show
          Jay Kreps added a comment - Patch v6. Rebased to 0.8 head. Removed changes to ClientUtils since they conflict and aren't really related to protocol changes.
          Hide
          Jay Kreps added a comment -

          Updated patch. This patch also makes the message format consistent with the other serialization formats for both key and value. Now both are a 4 byte size followed by an N byte payload.

          I have intentionally not made everything null safe since I think that is a larger change. We can do that in 0.8.1. This is just the binary format change.

          I also haven't rebased yet, will do that when this passes review since git is still down and I don't want to do two svn rebases.

          Show
          Jay Kreps added a comment - Updated patch. This patch also makes the message format consistent with the other serialization formats for both key and value. Now both are a 4 byte size followed by an N byte payload. I have intentionally not made everything null safe since I think that is a larger change. We can do that in 0.8.1. This is just the binary format change. I also haven't rebased yet, will do that when this passes review since git is still down and I don't want to do two svn rebases.
          Hide
          Jay Kreps added a comment -

          Missed two files on that last patch...

          Show
          Jay Kreps added a comment - Missed two files on that last patch...
          Hide
          Jay Kreps added a comment -

          Updated patch addresses issues as per points above. Still not rebased...await love from apache infra folks to revive our git.

          Show
          Jay Kreps added a comment - Updated patch addresses issues as per points above. Still not rebased...await love from apache infra folks to revive our git.
          Hide
          Jay Kreps added a comment -

          I should clarify that my goal here is to make the most minimal change that fixes the client protocol for at least the user-facing apis. I also wussed out on trying to generalize offsetrequest/response. Basically I think trying those things at these point would just take and we are trying to stabalize and release.

          So I made some of the changes you recommend, but some I think are bigger, and my hope was to hold off on those.

          For example my goal is not to implement correlation id, just add it to the protocol. To properly handle correlation id we need to make it so that we have a single counter across all requests on a single connection which is hard to do right now. I have some thoughts on generalizing some of our serialization and request handling stuff which I started to discuss in KAFKA-643. All I want to do now is fix as much of the protocol as I can while breaking as little as possible in the process.

          1. Agreed, fixed.

          2. Ack, I missed the correlation id in OffsetResponse. I had intended to leave it out of the non-public apis since this was meant to be a minimal change, but it is easy to add so i will do so. This should simplify future upgrades.

          3.1 Yeah, but see above comment.
          3.2 I mean properly speaking having a default correlation id doesn't really make sense does it? Anything other than a per-connection counter is basically a bug...

          4. No, it is a signed int so it should be fine for it to roll over every 4 billion requests per connection, that will take a while.

          5. Good point.

          6. Done

          7. See above comment on correlationId

          8. Did it for DefaultEventHandler as that is easy, cowardly not attempting for consumer.

          9.1 Done.
          9.2 Deleted, should not duplicate protocol docs.
          9.3 I chickened out on this. We will have to do it as a follow-up post 0.8 item.

          10.1 Agreed, but this is several weeks of work I think. This is a pretty big refactoring. Some thoughts on KAFKA-643.

          11. Yeah, I mean basically that constructor shouldn't exist at all since it isn't setting client id either.

          Show
          Jay Kreps added a comment - I should clarify that my goal here is to make the most minimal change that fixes the client protocol for at least the user-facing apis. I also wussed out on trying to generalize offsetrequest/response. Basically I think trying those things at these point would just take and we are trying to stabalize and release. So I made some of the changes you recommend, but some I think are bigger, and my hope was to hold off on those. For example my goal is not to implement correlation id, just add it to the protocol. To properly handle correlation id we need to make it so that we have a single counter across all requests on a single connection which is hard to do right now. I have some thoughts on generalizing some of our serialization and request handling stuff which I started to discuss in KAFKA-643 . All I want to do now is fix as much of the protocol as I can while breaking as little as possible in the process. 1. Agreed, fixed. 2. Ack, I missed the correlation id in OffsetResponse. I had intended to leave it out of the non-public apis since this was meant to be a minimal change, but it is easy to add so i will do so. This should simplify future upgrades. 3.1 Yeah, but see above comment. 3.2 I mean properly speaking having a default correlation id doesn't really make sense does it? Anything other than a per-connection counter is basically a bug... 4. No, it is a signed int so it should be fine for it to roll over every 4 billion requests per connection, that will take a while. 5. Good point. 6. Done 7. See above comment on correlationId 8. Did it for DefaultEventHandler as that is easy, cowardly not attempting for consumer. 9.1 Done. 9.2 Deleted, should not duplicate protocol docs. 9.3 I chickened out on this. We will have to do it as a follow-up post 0.8 item. 10.1 Agreed, but this is several weeks of work I think. This is a pretty big refactoring. Some thoughts on KAFKA-643 . 11. Yeah, I mean basically that constructor shouldn't exist at all since it isn't setting client id either.
          Hide
          Neha Narkhede added a comment -

          Thanks for the patch, reviewed the one that applies on trunk.
          1. Broker
          I guess we can delete the debug statement in sizeInBytes(). It's wierd that we have it.

          2. KafkaApis
          2.1 All responses objects besides OffsetRequest, LeaderIsrResponse and StopReplicaResponse have a correlation id. I'm guessing correlation id can be a request level thing that is included on every Kafka request-response object. Another benefit of doing that is the ability to log the correlation id as part of the trace statement in RequestChannel -
          trace("Completed request: %s totalTime:%d queueTime:%d localTime:%d remoteTime:%d sendTime:%d"
          .format(requestObj, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseSendTime))
          This will greatly simplify troubleshooting. Thoughts ?
          2.2 The OffsetRequest takes in correlation id but we don't return it as part of the OffsetResponse.

          3. OffsetRequest
          3.1 The correlationId is not passed into the constructor after being read from the byte buffer.
          3.2 Probably better to define a DefaultCorrelationId somewhere. It will useful elsewhere in the code.

          4. correlation id is a per request level id, does it make sense for it to be of type long instead ?

          5. SyncProducerConfig
          I think we should get rid of producer.request.correlation_id

          6. ProducerPool
          6.1 Remove unused import import java.net.InetSocketAddress

          7. SimpleConsumer
          7.1 It probably makes sense to allow earlierOrLatestOffset to take in the correlation id that defaults to 0. This is because ZookeeperConsumerConnector should use a correlation id that increments every time it sends a fetch/offset reques
          t to the Kafka brokers.

          8. ZookeeperConsumerConnector & DefaultEventHandler
          It will be nice if these classes set an ever increasing correlation id as part of every request they send to Kafka

          9. TopicMetadata
          9.1 Remove unused import KafkaException
          9.2 The documentation of the topic metadata request format is broken.
          9.3 Rename TopicMetadata API to ClusterMetadata ?

          10. PartitionMetadata
          10.1 In the constructor, it makes sense for the list of replicas to just be the broker ids, not the full Broker object. This will simplify PartitionMetadata.readFrom() and TopicMetadata.readFrom() as well.

          11. TopicMetadataRequest
          11.1 Can we let the following constructor take in the correlation id instead of hardcoding to 0 ?
          def this(topics: Seq[String])
          The reason is this is used by ClientUtils.fetchTopicMetadata, which in truen, is used elsewhere in ZookeeperConsumerConnector and Producer, where we'd like to track the requests by correlation id.
          11.2 The order of the constructor arguments in the scala & java api TopicMetadataRequest is very different. Elsewhere, in the code, versionId is first, followed by correlationId.

          Show
          Neha Narkhede added a comment - Thanks for the patch, reviewed the one that applies on trunk. 1. Broker I guess we can delete the debug statement in sizeInBytes(). It's wierd that we have it. 2. KafkaApis 2.1 All responses objects besides OffsetRequest, LeaderIsrResponse and StopReplicaResponse have a correlation id. I'm guessing correlation id can be a request level thing that is included on every Kafka request-response object. Another benefit of doing that is the ability to log the correlation id as part of the trace statement in RequestChannel - trace("Completed request: %s totalTime:%d queueTime:%d localTime:%d remoteTime:%d sendTime:%d" .format(requestObj, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseSendTime)) This will greatly simplify troubleshooting. Thoughts ? 2.2 The OffsetRequest takes in correlation id but we don't return it as part of the OffsetResponse. 3. OffsetRequest 3.1 The correlationId is not passed into the constructor after being read from the byte buffer. 3.2 Probably better to define a DefaultCorrelationId somewhere. It will useful elsewhere in the code. 4. correlation id is a per request level id, does it make sense for it to be of type long instead ? 5. SyncProducerConfig I think we should get rid of producer.request.correlation_id 6. ProducerPool 6.1 Remove unused import import java.net.InetSocketAddress 7. SimpleConsumer 7.1 It probably makes sense to allow earlierOrLatestOffset to take in the correlation id that defaults to 0. This is because ZookeeperConsumerConnector should use a correlation id that increments every time it sends a fetch/offset reques t to the Kafka brokers. 8. ZookeeperConsumerConnector & DefaultEventHandler It will be nice if these classes set an ever increasing correlation id as part of every request they send to Kafka 9. TopicMetadata 9.1 Remove unused import KafkaException 9.2 The documentation of the topic metadata request format is broken. 9.3 Rename TopicMetadata API to ClusterMetadata ? 10. PartitionMetadata 10.1 In the constructor, it makes sense for the list of replicas to just be the broker ids, not the full Broker object. This will simplify PartitionMetadata.readFrom() and TopicMetadata.readFrom() as well. 11. TopicMetadataRequest 11.1 Can we let the following constructor take in the correlation id instead of hardcoding to 0 ? def this(topics: Seq [String] ) The reason is this is used by ClientUtils.fetchTopicMetadata, which in truen, is used elsewhere in ZookeeperConsumerConnector and Producer, where we'd like to track the requests by correlation id. 11.2 The order of the constructor arguments in the scala & java api TopicMetadataRequest is very different. Elsewhere, in the code, versionId is first, followed by correlationId.
          Hide
          Neha Narkhede added a comment -

          I guess we want to include this on the 0.8 branch as well right ? The patch is probably created off of trunk, so it doesn't apply on the 0.8 branch. Do you mind uploading one that will apply on 0.8 ?

          Show
          Neha Narkhede added a comment - I guess we want to include this on the 0.8 branch as well right ? The patch is probably created off of trunk, so it doesn't apply on the 0.8 branch. Do you mind uploading one that will apply on 0.8 ?
          Hide
          Jay Kreps added a comment -

          Appears to pass basic system tests.

          Show
          Jay Kreps added a comment - Appears to pass basic system tests.
          Hide
          Jay Kreps added a comment -

          This patch implements the changes described above with the following exceptions:
          1. I punted on fixing OffsetRequest. This change kind of depends on the log refactoring and is somewhat larger than I expected. It would be nice to fix it but I was going to do that as a separate patch and maybe not for 0.8.
          2. I also changed instances where were using shorts for array lengths. There were a few of these and it complicates the protocol definition since you can't have a general definition of an array.
          3. I changed ClientUtils to not require Broker instances, since that is crazy.

          OffsetRequest, TopicMetadataRequest

          • Add correlation id. Not all are being set, but the point is just to get it in the protocol

          TopicMetadata

          • Change the serialization format so that we store only broker ids, not full brokers
          • "no leader" is encoded as leader_id=-1
          • The object itself doesn't change
          • Change sizes to all be 4 bytes to be consistent with all other arrays

          TopicMetadataResponse

          • Add broker list to response. This is guaranteed to have all "relevant" brokers--i.e. all leaders and replicas for topics included in the request
          • Add correlation id

          ClientUtils

          • fetchTopicMetadata should take a list of addresses not a list of brokers

          Broker

          • remove creatorid

          Other files

          • carry through the above changes (i.e. pass in the new argument)
          Show
          Jay Kreps added a comment - This patch implements the changes described above with the following exceptions: 1. I punted on fixing OffsetRequest. This change kind of depends on the log refactoring and is somewhat larger than I expected. It would be nice to fix it but I was going to do that as a separate patch and maybe not for 0.8. 2. I also changed instances where were using shorts for array lengths. There were a few of these and it complicates the protocol definition since you can't have a general definition of an array. 3. I changed ClientUtils to not require Broker instances, since that is crazy. OffsetRequest, TopicMetadataRequest Add correlation id. Not all are being set, but the point is just to get it in the protocol TopicMetadata Change the serialization format so that we store only broker ids, not full brokers "no leader" is encoded as leader_id=-1 The object itself doesn't change Change sizes to all be 4 bytes to be consistent with all other arrays TopicMetadataResponse Add broker list to response. This is guaranteed to have all "relevant" brokers--i.e. all leaders and replicas for topics included in the request Add correlation id ClientUtils fetchTopicMetadata should take a list of addresses not a list of brokers Broker remove creatorid Other files carry through the above changes (i.e. pass in the new argument)

            People

            • Assignee:
              Jay Kreps
              Reporter:
              Jay Kreps
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development