Details

      Description

      We want to change the producer/consumer request/response format according to the discussion in the following wiki:

      https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal

      1. KAFKA-240-FetchRequest-v1.patch
        129 kB
        Prashanth Menon
      2. KAFKA-240-FetchRequest-v2.patch
        127 kB
        Prashanth Menon
      3. KAFKA-240.v3.patch
        150 kB
        Joe Stein
      4. KAFKA-240-FetchRequest-validation-v1.patch
        10 kB
        Prashanth Menon
      5. KAFKA-240.ProducerRequest.v2.patch
        52 kB
        Joe Stein
      6. KAFKA-240.ProducerRequest.v3.patch
        53 kB
        Joe Stein
      7. KAFKA-240.ProducerRequest.v4.patch
        56 kB
        Joe Stein
      8. kafka-240_unittestfix_delta.patch
        11 kB
        Jun Rao

        Issue Links

          Activity

          Hide
          Joe Stein added a comment -

          committed missing file and confirmed compile on svn update and all tests passing

          resolving I think this ticket is all good now, I do not see anything missing... everything falling out from this ticket is being tracked in other JIRA

          Show
          Joe Stein added a comment - committed missing file and confirmed compile on svn update and all tests passing resolving I think this ticket is all good now, I do not see anything missing... everything falling out from this ticket is being tracked in other JIRA
          Hide
          Jun Rao added a comment -

          I got a compilation error on 0.8.

          /home/jrao/Intellij_workspace/kafka_0.8/core/src/main/scala/kafka/server/KafkaApis.scala:61: not found: type ProducerResponse[
          private def handleProducerRequest(request: ProducerRequest, requestHandlerName: String): Option[ProducerResponse] = {[

          Show
          Jun Rao added a comment - I got a compilation error on 0.8. /home/jrao/Intellij_workspace/kafka_0.8/core/src/main/scala/kafka/server/KafkaApis.scala:61: not found: type ProducerResponse[ private def handleProducerRequest(request: ProducerRequest, requestHandlerName: String): Option [ProducerResponse] = {[
          Hide
          Joe Stein added a comment - - edited

          9) done
          10) done

          changed member vars to camel case for consistency

          tests passing

          committed to 0.8 branch

          Show
          Joe Stein added a comment - - edited 9) done 10) done changed member vars to camel case for consistency tests passing committed to 0.8 branch
          Hide
          Joe Stein added a comment -

          Thanks Jun, I get the issue now. Right, so after I read the byte buffer I need to return a ProducerRequest contructed with the versionId which right now is just dangling var. ok, got it.

          Show
          Joe Stein added a comment - Thanks Jun, I get the issue now. Right, so after I read the byte buffer I need to return a ProducerRequest contructed with the versionId which right now is just dangling var. ok, got it.
          Hide
          Jun Rao added a comment -

          Joe,

          When the client creates a ProduceRequest, we don't need the client to specify the version_id explicitly, since it's embedded in the client library. However, when we deserialize received bytes from a client into a ProduceRequest on the broker, we don't want to pick up the version_id that the server is on. Instead, we need to know the version of the client. So, we will need 2 constructors for ProduceRequest, one with version_id and one without. The former will be used on the broker and the latter will be used in the producer client.

          Show
          Jun Rao added a comment - Joe, When the client creates a ProduceRequest, we don't need the client to specify the version_id explicitly, since it's embedded in the client library. However, when we deserialize received bytes from a client into a ProduceRequest on the broker, we don't want to pick up the version_id that the server is on. Instead, we need to know the version of the client. So, we will need 2 constructors for ProduceRequest, one with version_id and one without. The former will be used on the broker and the latter will be used in the producer client.
          Hide
          Joe Stein added a comment -

          It is written to the buffer though it is not passed in when constructing the ProducerRequest

          val version_id: Short = ProducerRequest.version_id

          happens during the constructor

          since actual changes come in the code I though having this be on the object leve and part of the jar the publisher was calling made the most sense. It seemed that the client might not need or want to have this knowledge but I can pass it into the constructor, not a problem.

          Also Prashanth pointed out my member vars are not consistent with the rest of the system and camel case I will make that change also to be consistent.

          Show
          Joe Stein added a comment - It is written to the buffer though it is not passed in when constructing the ProducerRequest val version_id: Short = ProducerRequest.version_id happens during the constructor since actual changes come in the code I though having this be on the object leve and part of the jar the publisher was calling made the most sense. It seemed that the client might not need or want to have this knowledge but I can pass it into the constructor, not a problem. Also Prashanth pointed out my member vars are not consistent with the rest of the system and camel case I will make that change also to be consistent.
          Hide
          Jun Rao added a comment -

          Yes, however, that information is not passed into ProduceRequest. So the server doesn't know the version of the producer client.

          Show
          Jun Rao added a comment - Yes, however, that information is not passed into ProduceRequest. So the server doesn't know the version of the producer client.
          Hide
          Joe Stein added a comment -

          Hi Jun, on #9

          def readFrom(buffer: ByteBuffer): ProducerRequest = {
          val version_id: Short = buffer.getShort

          that is already there, do you mean something else or different than what I am looking at?

          Show
          Joe Stein added a comment - Hi Jun, on #9 def readFrom(buffer: ByteBuffer): ProducerRequest = { val version_id: Short = buffer.getShort that is already there, do you mean something else or different than what I am looking at?
          Hide
          Jun Rao added a comment -

          Joe,

          kafka-239 has been committed to the 0.8 branch. You are good to go. After rebasing your code, make sure all unit tests still pass.

          Show
          Jun Rao added a comment - Joe, kafka-239 has been committed to the 0.8 branch. You are good to go. After rebasing your code, make sure all unit tests still pass.
          Hide
          Joe Stein added a comment -

          ok, sounds good. I will commit the changes with these last 2 issues after rebasing on kafka-239 once it is committed to the branch, np

          Show
          Joe Stein added a comment - ok, sounds good. I will commit the changes with these last 2 issues after rebasing on kafka-239 once it is committed to the branch, np
          Hide
          Jun Rao added a comment -

          Joe,

          There are 2 reasons why the unit test fails.
          1. Some default values for ProducerRequest are not set consistently.
          2. The bigger issue is that in scala, Array equal is only testing equal on reference, not the actual value. This is different from how other Seq classes like List behave. To fix that, I have to explicit define equal in ProdueRequest and TopicData.

          Attached is a patch of only 4 scala files that I modified. Apply your v4 patch, revert those 4 files and apply my patch. The unit test passes now.

          A few other things:
          9. When deserilizing bytes to ProducerRequest, we need to set version id.
          10. In KafkaApi, we should send no ack on ProducerRequest since the producer is not reading the response yet. Add a comment that this will be fixed in kafka-49.

          Once the above 2 issues are addressed, you can just commit without further review. Finally, would you mind committing after kafka-239? That patch changed quite a few unit tests and may take a bit more time to refactor.

          Show
          Jun Rao added a comment - Joe, There are 2 reasons why the unit test fails. 1. Some default values for ProducerRequest are not set consistently. 2. The bigger issue is that in scala, Array equal is only testing equal on reference, not the actual value. This is different from how other Seq classes like List behave. To fix that, I have to explicit define equal in ProdueRequest and TopicData. Attached is a patch of only 4 scala files that I modified. Apply your v4 patch, revert those 4 files and apply my patch. The unit test passes now. A few other things: 9. When deserilizing bytes to ProducerRequest, we need to set version id. 10. In KafkaApi, we should send no ack on ProducerRequest since the producer is not reading the response yet. Add a comment that this will be fixed in kafka-49. Once the above 2 issues are addressed, you can just commit without further review. Finally, would you mind committing after kafka-239? That patch changed quite a few unit tests and may take a bit more time to refactor.
          Hide
          Joe Stein added a comment -

          I forgot to remove these 2 files are not needed anymore
          svn delete .//core/src/main/scala/kafka/api/MultiProducerRequest.scala

          svn delete .//core/src/main/scala/kafka/server/MultiMessageSetSend.scala

          only change from v3

          Show
          Joe Stein added a comment - I forgot to remove these 2 files are not needed anymore svn delete .//core/src/main/scala/kafka/api/MultiProducerRequest.scala svn delete .//core/src/main/scala/kafka/server/MultiMessageSetSend.scala only change from v3
          Hide
          Joe Stein added a comment -

          1) done. hashCode is implemented by the scala runtime for case which interogates all members http://lampsvn.epfl.ch/trac/scala/browser/scala/branches/2.8.x/src/library/scala/runtime/ScalaRunTime.scala#L151 i removed it for now
          2) KAFKA-288
          3) done. done.
          4) done. done. but still getting the failure of kafka.producer.AsyncProducerTest
          5) errr, I don't see this looks ok to me maybe something in how the patch got applied? I tested it again on another box for what I am uploading no issue.
          6) done
          7) KAFKA-288
          8) KAFKA-289

          Show
          Joe Stein added a comment - 1) done. hashCode is implemented by the scala runtime for case which interogates all members http://lampsvn.epfl.ch/trac/scala/browser/scala/branches/2.8.x/src/library/scala/runtime/ScalaRunTime.scala#L151 i removed it for now 2) KAFKA-288 3) done. done. 4) done. done. but still getting the failure of kafka.producer.AsyncProducerTest 5) errr, I don't see this looks ok to me maybe something in how the patch got applied? I tested it again on another box for what I am uploading no issue. 6) done 7) KAFKA-288 8) KAFKA-289
          Hide
          Jun Rao added a comment -

          If you are not addressing #8 in this jira, we can track #2 and #8 together in a separate jira.

          Show
          Jun Rao added a comment - If you are not addressing #8 in this jira, we can track #2 and #8 together in a separate jira.
          Hide
          Joe Stein added a comment -

          for #2

          the reason I left send(topic: String, messages: ByteBufferMessageSet) was because I was unsure what we wanted to-do with the changes that would result in KafkaRecordWriter and DataGenerator both would need to start to implement and pass through data or I could stub things out so it compiles and have separate tickets for each of those implementations which I do not know much about at all (yet)

          Show
          Joe Stein added a comment - for #2 the reason I left send(topic: String, messages: ByteBufferMessageSet) was because I was unsure what we wanted to-do with the changes that would result in KafkaRecordWriter and DataGenerator both would need to start to implement and pass through data or I could stub things out so it compiles and have separate tickets for each of those implementations which I do not know much about at all (yet)
          Hide
          Jun Rao added a comment -

          Joe,

          Thanks for the patch. We are almost there. Some comments:

          1. ProducerRequest is case class. So the equal() you want is alway defined for you. No need to redefine it. Not sure what scala does for hashCode for case class and whether we should override it or not.
          2. javaapi.SyncProducer: We should get rid of send(topic: String, messages: ByteBufferMessageSet) and only keep send(producerRequest: kafka.javaapi.ProducerRequest).
          3. KafkaApis: We should get rid of the handling of MultiProduce and only support Produce requests. Also, in handleProducerRequest, we should return None if reqeust.required_acks ==0 (not ack_timeout == 0)
          4. DefaultEventHandler.send(): use ProducerRequest without the package name. Also, use the values in ProducerConfig to fill in parameters (like clientid) of ProducerRequest (this is probably what's causing the unit test testBrokerListAndAsync to fail).
          5. FetchResponse: broken lines in PartitionData
          6. javaapi.FetchResponse: could you make data a private member?
          7. The way that SyncProducer sends a ProducerRequest over socket is to first serialize the whole request in a bytebuffer and then sends the bytebuffer through socket. An alternative is to send the request like FetchReponse, using a ProduceRequestSend that reuses TopicDataSend. This avoids code duplication and is more efficient since it sends data in ByteBufferMessagesSet directly to socket and avoids extra copying from messageset to bytebuffer.
          8. javaapi.ProducerRequest: We will need to define a java version of TopicData so that java producers can create request conveniently. The java version of TopicData will use the java version of ByteBufferMessageSet.

          For #7 and #8, it's fine to have them fixed in separate jiras (just link the jiras here).

          Show
          Jun Rao added a comment - Joe, Thanks for the patch. We are almost there. Some comments: 1. ProducerRequest is case class. So the equal() you want is alway defined for you. No need to redefine it. Not sure what scala does for hashCode for case class and whether we should override it or not. 2. javaapi.SyncProducer: We should get rid of send(topic: String, messages: ByteBufferMessageSet) and only keep send(producerRequest: kafka.javaapi.ProducerRequest). 3. KafkaApis: We should get rid of the handling of MultiProduce and only support Produce requests. Also, in handleProducerRequest, we should return None if reqeust.required_acks ==0 (not ack_timeout == 0) 4. DefaultEventHandler.send(): use ProducerRequest without the package name. Also, use the values in ProducerConfig to fill in parameters (like clientid) of ProducerRequest (this is probably what's causing the unit test testBrokerListAndAsync to fail). 5. FetchResponse: broken lines in PartitionData 6. javaapi.FetchResponse: could you make data a private member? 7. The way that SyncProducer sends a ProducerRequest over socket is to first serialize the whole request in a bytebuffer and then sends the bytebuffer through socket. An alternative is to send the request like FetchReponse, using a ProduceRequestSend that reuses TopicDataSend. This avoids code duplication and is more efficient since it sends data in ByteBufferMessagesSet directly to socket and avoids extra copying from messageset to bytebuffer. 8. javaapi.ProducerRequest: We will need to define a java version of TopicData so that java producers can create request conveniently. The java version of TopicData will use the java version of ByteBufferMessageSet. For #7 and #8, it's fine to have them fixed in separate jiras (just link the jiras here).
          Hide
          Joe Stein added a comment -

          Attached are my changes from the last round of feedback and rebased on the branch from commits.

          Still have 1 failing test

          [error] Test Failed: testBrokerListAndAsync

          and I need to still put the offsets in the response

          other items changed and related refactoring done

          Show
          Joe Stein added a comment - Attached are my changes from the last round of feedback and rebased on the branch from commits. Still have 1 failing test [error] Test Failed: testBrokerListAndAsync and I need to still put the offsets in the response other items changed and related refactoring done
          Hide
          Joe Stein added a comment -

          cool, so I need to rebase, update my changes for the few issues that were called out and submit a new patch. Not sure if I will have time tomorrow but by Monday will try sooner though, np

          Show
          Joe Stein added a comment - cool, so I need to rebase, update my changes for the few issues that were called out and submit a new patch. Not sure if I will have time tomorrow but by Monday will try sooner though, np
          Hide
          Jun Rao added a comment -

          Thanks Prashanth, excellent patch. Committed Validation v1 patch to 0.8.

          Show
          Jun Rao added a comment - Thanks Prashanth, excellent patch. Committed Validation v1 patch to 0.8.
          Hide
          Prashanth Menon added a comment -

          Validation v1 patch attached. You should be able to apply against 0.8. Pre-emptive apology to Joe who migh need to upload new patch

          Show
          Prashanth Menon added a comment - Validation v1 patch attached. You should be able to apply against 0.8. Pre-emptive apology to Joe who migh need to upload new patch
          Hide
          Jun Rao added a comment -

          Yes, we should enforce the same check in the broker (mostly for non-java clients).

          Show
          Jun Rao added a comment - Yes, we should enforce the same check in the broker (mostly for non-java clients).
          Hide
          Prashanth Menon added a comment -

          Good point. I can make that change in this ticket rather than creating a new one.

          If so, I'd just get rid of the builder and put that grouping logic into the request itself. Each client will have to do likewise and the broker can enforce the grouping as well to be safe?

          Show
          Prashanth Menon added a comment - Good point. I can make that change in this ticket rather than creating a new one. If so, I'd just get rid of the builder and put that grouping logic into the request itself. Each client will have to do likewise and the broker can enforce the grouping as well to be safe?
          Hide
          Jun Rao added a comment -

          Prashanth,

          Another thing. We don't enforce that a given topic appears in at most 1 OffsetDetail in a FetchRequest. We will need to do that since FetchResponse code assumes that.

          Show
          Jun Rao added a comment - Prashanth, Another thing. We don't enforce that a given topic appears in at most 1 OffsetDetail in a FetchRequest. We will need to do that since FetchResponse code assumes that.
          Hide
          Prashanth Menon added a comment - - edited

          Jun, ah, thanks for catching the missing changes in the java api. Regarding the second point, you're completely right. That change was made to simplify when reading the response at the client since the ByteBufferMessageSet requires an initial offset - this offset can always be set after the fact. Ticket is KAFKA-271.

          Show
          Prashanth Menon added a comment - - edited Jun, ah, thanks for catching the missing changes in the java api. Regarding the second point, you're completely right. That change was made to simplify when reading the response at the client since the ByteBufferMessageSet requires an initial offset - this offset can always be set after the fact. Ticket is KAFKA-271 .
          Hide
          Jun Rao added a comment -

          Prashanth,
          Thanks for the patch. Just committed KAFKA-240-FetchRequest-v2.patch with a minor fix (javaapi.FetchResponse.messageSet returns ByteBufferMessageSet instead of MessageSet, to make it consistent with the scala api). Technically, Response doesn't need initialoffset, since it can be obtained from the fetch request. Could you open another jira to get this fixed?

          Joe,

          Thanks for the patch. Some comments:
          1. ProduceResponse: our original plan was to make it optional, i.e., only send response if ack is not 0.
          2. syncProducer.send should have 1 input parameter, ProduceRequest. Just like SimpleConsumer. Also, the scala version of syncProducer should not use javaapi.ProduceRequest.
          3. ProducerReqeust should use TopicData, instead of WiredData.

          As for your questions.
          1) One possibility is to add a method getSerialized() in MessageSet. Only ByteBufferMessageSet will implement this method. FileBufferMessageSet can throw a non-supported exception.
          3) Offsets should be the offset of the log after the message sets are appended. We may need to change log.append to return a MessageSetAndOffset.

          Show
          Jun Rao added a comment - Prashanth, Thanks for the patch. Just committed KAFKA-240 -FetchRequest-v2.patch with a minor fix (javaapi.FetchResponse.messageSet returns ByteBufferMessageSet instead of MessageSet, to make it consistent with the scala api). Technically, Response doesn't need initialoffset, since it can be obtained from the fetch request. Could you open another jira to get this fixed? Joe, Thanks for the patch. Some comments: 1. ProduceResponse: our original plan was to make it optional, i.e., only send response if ack is not 0. 2. syncProducer.send should have 1 input parameter, ProduceRequest. Just like SimpleConsumer. Also, the scala version of syncProducer should not use javaapi.ProduceRequest. 3. ProducerReqeust should use TopicData, instead of WiredData. As for your questions. 1) One possibility is to add a method getSerialized() in MessageSet. Only ByteBufferMessageSet will implement this method. FileBufferMessageSet can throw a non-supported exception. 3) Offsets should be the offset of the log after the message sets are appended. We may need to change log.append to return a MessageSetAndOffset.
          Hide
          Joe Stein added a comment - - edited

          here is v3 patch for ProducerRequest changes and ProducerResponse also inclusive of v2 changes for Fetch

          note a few things

          1) I did not integrate the topic & partition data classes from our two changes, I ran into a bump with not being able to get the buffer from the abstract MessageSet and figured we can do this later wanted to get this up and reviewed see if anything else is left. Prashanth if you have some ideas here let me know

          2) There is still a failed test in AsyncPoolTest

          3) I was not sure what to-do with errors and offsets in KafkaApi handleProducerRequest in building the ProducerResponse for what the values should be in that block

          Show
          Joe Stein added a comment - - edited here is v3 patch for ProducerRequest changes and ProducerResponse also inclusive of v2 changes for Fetch note a few things 1) I did not integrate the topic & partition data classes from our two changes, I ran into a bump with not being able to get the buffer from the abstract MessageSet and figured we can do this later wanted to get this up and reviewed see if anything else is left. Prashanth if you have some ideas here let me know 2) There is still a failed test in AsyncPoolTest 3) I was not sure what to-do with errors and offsets in KafkaApi handleProducerRequest in building the ProducerResponse for what the values should be in that block
          Hide
          Joe Stein added a comment -

          Hey guys, so I started integrate my changes over Prashanth's changes (which are over the 0.8 branch now).

          I see now PartitionData has an error code for each partition (and the comments above in regards to that) ... the thing is that my ProducerResponse does not use this and has it's own array of errors (which makes sense).

          So... do we leave out the the error in PartitionData (which is how the wiki is) or do I use PartitionData and just don't implement the error or do I use my own PartitionData class (like I was doing before with WiredPartition ????

          Show
          Joe Stein added a comment - Hey guys, so I started integrate my changes over Prashanth's changes (which are over the 0.8 branch now). I see now PartitionData has an error code for each partition (and the comments above in regards to that) ... the thing is that my ProducerResponse does not use this and has it's own array of errors (which makes sense). So... do we leave out the the error in PartitionData (which is how the wiki is) or do I use PartitionData and just don't implement the error or do I use my own PartitionData class (like I was doing before with WiredPartition ????
          Hide
          Prashanth Menon added a comment - - edited

          1, 2, 3. Done, patch attached.

          I also changed the wire format so that the versionId is sent earlier. This should make brokers and clients handle protocol versioning easier without too much bit fiddling.

          Show
          Prashanth Menon added a comment - - edited 1, 2, 3. Done, patch attached. I also changed the wire format so that the versionId is sent earlier. This should make brokers and clients handle protocol versioning easier without too much bit fiddling.
          Hide
          Jun Rao added a comment -

          Also, for the builder for FetchRequest. My concern is that it's not clear what the required parameters are.

          Show
          Jun Rao added a comment - Also, for the builder for FetchRequest. My concern is that it's not clear what the required parameters are.
          Hide
          Joe Stein added a comment -

          Yes, sounds good. I will apply his patch and refactor my changes to use his topic and partition classes for the data structure.

          Show
          Joe Stein added a comment - Yes, sounds good. I will apply his patch and refactor my changes to use his topic and partition classes for the data structure.
          Hide
          Jun Rao added a comment -

          Prashanth, thanks for the patch. Looks good overall.
          1. FetchResponse: Since we already have error code at the PartitionData level, should we get rid of the error code at the FetchResponse level? There is no obvious use case for the latter.
          2. Should FetchResponse.messageSet return ByteBufferMessageSet? That way caller doesn't have to cast.
          3. Could you rebase to the latest 0.8 branch?

          Joe, do you think we can take Prashanth's patch first and apply your patch on top later?

          Show
          Jun Rao added a comment - Prashanth, thanks for the patch. Looks good overall. 1. FetchResponse: Since we already have error code at the PartitionData level, should we get rid of the error code at the FetchResponse level? There is no obvious use case for the latter. 2. Should FetchResponse.messageSet return ByteBufferMessageSet? That way caller doesn't have to cast. 3. Could you rebase to the latest 0.8 branch? Joe, do you think we can take Prashanth's patch first and apply your patch on top later?
          Hide
          Taylor Gautier added a comment -

          I think I've answered my own question - the wiki contains the description and the field correlation_id will allow for this kind of implementation. Excellent.

          Show
          Taylor Gautier added a comment - I think I've answered my own question - the wiki contains the description and the field correlation_id will allow for this kind of implementation. Excellent.
          Hide
          Taylor Gautier added a comment -

          Hi - is there a description of the new FetchResponse? Basically I want to make sure that it includes the topic name to allow for out of order responses which will allow for a future implementation of multiplexed requests/responses that is easier and more efficient than the current method which relies strictly on ordering of the responses. This is strongly related to the async implementation that Jay is working on.

          Show
          Taylor Gautier added a comment - Hi - is there a description of the new FetchResponse? Basically I want to make sure that it includes the topic name to allow for out of order responses which will allow for a future implementation of multiplexed requests/responses that is easier and more efficient than the current method which relies strictly on ordering of the responses. This is strongly related to the async implementation that Jay is working on.
          Hide
          Prashanth Menon added a comment -

          The reason I made a builder was to separate how data is stored/sent from how the client logically thinks of and creates a request. You're right though, named arguments would solve some of the problems but because of the nested nature of the object, the builder seems to help with how requests are constructed - it makes the tests cleaner and probably client code as well. Either way, I'm okay with removing it.

          There is one concern I have with the patch I'd like to solicity feedback on. The fetch response sorts its messages by topic and partition and leverages this sorted nature when performing binary search when retrieving message sets. This only works when the response is read from the socket by a client, not upon intial construction. This asymmetry seems to bug me for some reason. Anyone hav a more elegant solution? Should we sort at all?

          Also, the error code from the fetch response needs to move out into the send class.

          Show
          Prashanth Menon added a comment - The reason I made a builder was to separate how data is stored/sent from how the client logically thinks of and creates a request. You're right though, named arguments would solve some of the problems but because of the nested nature of the object, the builder seems to help with how requests are constructed - it makes the tests cleaner and probably client code as well. Either way, I'm okay with removing it. There is one concern I have with the patch I'd like to solicity feedback on. The fetch response sorts its messages by topic and partition and leverages this sorted nature when performing binary search when retrieving message sets. This only works when the response is read from the socket by a client, not upon intial construction. This asymmetry seems to bug me for some reason. Anyone hav a more elegant solution? Should we sort at all? Also, the error code from the fetch response needs to move out into the send class.
          Hide
          Jay Kreps added a comment -

          Hey Prashanth, this is great. Question do we need a builder class for FetchRequest versus just using named arguments? It doesn't make a big difference but it would probably be good for us to agree on a pattern and carry it forward for all the various requests. With named arguments presumably it would be something like:

          new FetchRequest(correlationId = 2345454, versionId = 2, ...)

          Show
          Jay Kreps added a comment - Hey Prashanth, this is great. Question do we need a builder class for FetchRequest versus just using named arguments? It doesn't make a big difference but it would probably be good for us to agree on a pattern and carry it forward for all the various requests. With named arguments presumably it would be something like: new FetchRequest(correlationId = 2345454, versionId = 2, ...)
          Hide
          Joe Stein added a comment -

          here are my changes https://gist.github.com/1748005 I don't want to attach a patch yet as test are not all passing yet (not sure yet if I am going to have time before Tuesday night to fix that) and still have a few TODO I have in the changes.

          feedback welcome ++

          Show
          Joe Stein added a comment - here are my changes https://gist.github.com/1748005 I don't want to attach a patch yet as test are not all passing yet (not sure yet if I am going to have time before Tuesday night to fix that) and still have a few TODO I have in the changes. feedback welcome ++
          Hide
          Prashanth Menon added a comment - - edited

          Hi all,

          Wasn't able to respond on Friday, but I've just wrapped up the last of the FetchRequest changes. I've attached a patch you can apply to 0.8. All tests should pass. I'll need to coordinate with Joe on how to generate a full patch, or we can leave them separate and have the commiter apply them separately but consecutively.

          Details:
          1. FetchRequest completely modified along with associatd java type.
          2. New FetchResponse, TopicData and PartitionData classes. The latter *Data classes can be shared with the producer side as much of it is common.
          3. New *Send classes for FetchResponse to encapsulate send logic. Pretty straightforward.
          3. All the test have been changed to make them multi-fetch by default. Because they make use of the response and the send, I've not added new tests for them.
          4. I've removed the MultiFetchResponse classes along with associated java types, implicit conversions, request keys and tests.

          Have a look. It's fairly massive so any and all feedback is welcome

          Show
          Prashanth Menon added a comment - - edited Hi all, Wasn't able to respond on Friday, but I've just wrapped up the last of the FetchRequest changes. I've attached a patch you can apply to 0.8. All tests should pass. I'll need to coordinate with Joe on how to generate a full patch, or we can leave them separate and have the commiter apply them separately but consecutively. Details: 1. FetchRequest completely modified along with associatd java type. 2. New FetchResponse, TopicData and PartitionData classes. The latter *Data classes can be shared with the producer side as much of it is common. 3. New *Send classes for FetchResponse to encapsulate send logic. Pretty straightforward. 3. All the test have been changed to make them multi-fetch by default. Because they make use of the response and the send, I've not added new tests for them. 4. I've removed the MultiFetchResponse classes along with associated java types, implicit conversions, request keys and tests. Have a look. It's fairly massive so any and all feedback is welcome
          Hide
          Joe Stein added a comment -

          here is the latest on the ProducerRequest https://gist.github.com/1735455

          ProducerResponse and getting test cases to work are still pending (will start on those tomorrow night) and then some cleanup TODOs

          I made assumption that only the correlation_id will be changing for each request and the other values will be for the entire producer (setting those are TODO in my diff)

          verifySendBuffer in SyncProducer seems seems to just be a tracing utility I think turning that into a toString() after a buffer read will give the same result in the new code, i figure there will be a bunch of other tweaks (like that) as I go through the test cases so leaving this for that iteration

          Also I still need to gut out multi* request that is now inherent in the new ProducerRequest as it is

          Show
          Joe Stein added a comment - here is the latest on the ProducerRequest https://gist.github.com/1735455 ProducerResponse and getting test cases to work are still pending (will start on those tomorrow night) and then some cleanup TODOs I made assumption that only the correlation_id will be changing for each request and the other values will be for the entire producer (setting those are TODO in my diff) verifySendBuffer in SyncProducer seems seems to just be a tracing utility I think turning that into a toString() after a buffer read will give the same result in the new code, i figure there will be a bunch of other tweaks (like that) as I go through the test cases so leaving this for that iteration Also I still need to gut out multi* request that is now inherent in the new ProducerRequest as it is
          Hide
          Jay Kreps added a comment -

          Cool, I have a draft of long-poll support in KAFKA-48. I am hard coding the values that are in the new fetch request (min_size, max_wait). We should discuss which goes in first since this will change the server side KafkaRequestHandler/KafkaApis code a fair amount. If you are refactoring to remove the old requests I expect that will be the bigger patch and it would be better for me to rebase to you than vice versa. But if you guys are a ways out I can go first too.

          Show
          Jay Kreps added a comment - Cool, I have a draft of long-poll support in KAFKA-48 . I am hard coding the values that are in the new fetch request (min_size, max_wait). We should discuss which goes in first since this will change the server side KafkaRequestHandler/KafkaApis code a fair amount. If you are refactoring to remove the old requests I expect that will be the bigger patch and it would be better for me to rebase to you than vice versa. But if you guys are a ways out I can go first too.
          Hide
          Joe Stein added a comment -

          I am finishing up the changes for SyncProducer and should have that done later tonight.

          Once those are done I will post them here and then I will jump into fixing the tests for my changes (which I will be working on over the weekend).

          Prashanth and I have overlap of our data structure classes (in my changes they are called WiredTopic, WiredPartition) unless there is some good reason to keep these separate like we see them diverging we will have to fold these in together across our changes

          Show
          Joe Stein added a comment - I am finishing up the changes for SyncProducer and should have that done later tonight. Once those are done I will post them here and then I will jump into fixing the tests for my changes (which I will be working on over the weekend). Prashanth and I have overlap of our data structure classes (in my changes they are called WiredTopic, WiredPartition) unless there is some good reason to keep these separate like we see them diverging we will have to fold these in together across our changes
          Hide
          Jun Rao added a comment -

          Joe, Prashanth,

          Do you guys have patches that we can review now? We are getting very close to start using the new request format in 0.8.

          Thanks,

          Show
          Jun Rao added a comment - Joe, Prashanth, Do you guys have patches that we can review now? We are getting very close to start using the new request format in 0.8. Thanks,
          Hide
          Prashanth Menon added a comment -

          Just an FYI that I've added a few more questions related to the FetchRequest side in the wire format wiki.

          Show
          Prashanth Menon added a comment - Just an FYI that I've added a few more questions related to the FetchRequest side in the wire format wiki.
          Hide
          Jun Rao added a comment -

          Joe,

          The changes in github make sense. A couple of suggestions:
          1. We can probably use case class for ProduceRequest.
          2. The return type of produce request handler should be Option[ProduceResponse] since the response is optional.

          Show
          Jun Rao added a comment - Joe, The changes in github make sense. A couple of suggestions: 1. We can probably use case class for ProduceRequest. 2. The return type of produce request handler should be Option [ProduceResponse] since the response is optional.
          Hide
          Prashanth Menon added a comment -

          Here's the updated FetchRequest: https://gist.github.com/1660067 . Stil a work in progress will it's integrated into other components.

          Joe, a couple of comments:
          1. You don't need to include the "size" parameter in ProduceRequest as it's calculated in sizeInBytes() and invoked/written by BoundedByteBufferSend when transmitting on the network connection.
          2. You don't need to include the "requestTypeId" since it's passed automatically passed up to the Request superclass (eg. RequestType.ProductRequest) and written out/transmitted by the same BoundedByteBufferSend.

          Otherwise, looking good. I'm expecting to wrap up by Wednesday if I can get some time in after work.

          Show
          Prashanth Menon added a comment - Here's the updated FetchRequest: https://gist.github.com/1660067 . Stil a work in progress will it's integrated into other components. Joe, a couple of comments: 1. You don't need to include the "size" parameter in ProduceRequest as it's calculated in sizeInBytes() and invoked/written by BoundedByteBufferSend when transmitting on the network connection. 2. You don't need to include the "requestTypeId" since it's passed automatically passed up to the Request superclass (eg. RequestType.ProductRequest) and written out/transmitted by the same BoundedByteBufferSend. Otherwise, looking good. I'm expecting to wrap up by Wednesday if I can get some time in after work.
          Hide
          Joe Stein added a comment -

          more progress on the producer side https://gist.github.com/1655565 (not enough to upload patch but wanted to put the svn diff in case anyone took a look before I started in on it again tomorrow/monday

          have made a few assumptions and still need to integrate with SyncProducer, ProducerPool, DefaultEventHandler and hook in the response but so far so good.

          Show
          Joe Stein added a comment - more progress on the producer side https://gist.github.com/1655565 (not enough to upload patch but wanted to put the svn diff in case anyone took a look before I started in on it again tomorrow/monday have made a few assumptions and still need to integrate with SyncProducer, ProducerPool, DefaultEventHandler and hook in the response but so far so good.
          Hide
          Joe Stein added a comment -

          yup, that is much simpler

          here is my first crack on ProducerRequest read and write https://gist.github.com/1636208 for the new format

          I will start chipping away refactoring for this change

          Show
          Joe Stein added a comment - yup, that is much simpler here is my first crack on ProducerRequest read and write https://gist.github.com/1636208 for the new format I will start chipping away refactoring for this change
          Hide
          Jun Rao added a comment -

          Joe,

          I think it could be simpler than that. You can have the following encoding:

          1. of topics (2 bytes)
            topic1 encoding (see encoding format below)
            ...
            topicN encoding

          Topic Encoding:
          topic name length (2 bytes)
          topic name bytes

          1. of partitions (2 bytes)
            partition1 encoding (see encoding format below)
            ....
            partitionN encoding

          Partition encoding:
          partition id: (4 bytes)
          messageSet encoding

          Show
          Jun Rao added a comment - Joe, I think it could be simpler than that. You can have the following encoding: of topics (2 bytes) topic1 encoding (see encoding format below) ... topicN encoding Topic Encoding: topic name length (2 bytes) topic name bytes of partitions (2 bytes) partition1 encoding (see encoding format below) .... partitionN encoding Partition encoding: partition id: (4 bytes) messageSet encoding
          Hide
          Joe Stein added a comment - - edited

          << we just need a way to encode array structures

          right, here we have arrays within arrays so we need to have the "hint" to explain that in the position after ack_timeout

          what I am proposing is to have a string

          X,A,B,C

          split on ","

          X is the number of elements in the topics array

          A,B,C... (etc) would be for each topic how many partition array elements are there

          so this could look

          2,2,3

          meaning 2 topic_data_struct elements, the first element of the topic array has 2 partition_data_struct elements in it's array, the second element in the topic array has 3 partition_data_struct elements

          the walking the positions becomes easy and for the gets from the bytebuffer creating the objects

          if it was

          5,3,3,3,3,4 then 5 topics, the first 4 of them 3 elements and the last 4 elements

          I will start in on this approach

          Show
          Joe Stein added a comment - - edited << we just need a way to encode array structures right, here we have arrays within arrays so we need to have the "hint" to explain that in the position after ack_timeout what I am proposing is to have a string X,A,B,C split on "," X is the number of elements in the topics array A,B,C... (etc) would be for each topic how many partition array elements are there so this could look 2,2,3 meaning 2 topic_data_struct elements, the first element of the topic array has 2 partition_data_struct elements in it's array, the second element in the topic array has 3 partition_data_struct elements the walking the positions becomes easy and for the gets from the bytebuffer creating the objects if it was 5,3,3,3,3,4 then 5 topics, the first 4 of them 3 elements and the last 4 elements I will start in on this approach
          Hide
          Jun Rao added a comment -

          Joe, thanks for working on this jira. Some comments:

          1. Using Thrift is one possible option for supporting multiple language bindings. If we want to use thrift, we probably want to use it for both producer and consumer. The concern that I have is that Thrift adds its own layer of RPC and may prevent us from doing things like zero-copy transfer on the consumer side. In any case, supporting multiple language binding is important, but a bigger topic. It is probably beyond the scope of this jira. I prefer that we follow it up in a separate jira.

          2. I think you last option is feasible. Basically, we just need a way to encode array structures. This can be done by encoding the array length at the beginning, followed by the encoding of each array element. Note that each array element can have nested arrays and we can encode them in the same way recursively. This is how MultiProduceRequest currently works.

          Show
          Jun Rao added a comment - Joe, thanks for working on this jira. Some comments: 1. Using Thrift is one possible option for supporting multiple language bindings. If we want to use thrift, we probably want to use it for both producer and consumer. The concern that I have is that Thrift adds its own layer of RPC and may prevent us from doing things like zero-copy transfer on the consumer side. In any case, supporting multiple language binding is important, but a bigger topic. It is probably beyond the scope of this jira. I prefer that we follow it up in a separate jira. 2. I think you last option is feasible. Basically, we just need a way to encode array structures. This can be done by encoding the array length at the beginning, followed by the encoding of each array element. Note that each array element can have nested arrays and we can encode them in the same way recursively. This is how MultiProduceRequest currently works.
          Hide
          Joe Stein added a comment -

          So, one possibility I am looking at for the data [<topic_data_struct>] is to use ByteArrayOutputStream and ObjectOutputStream to make the List[WiredTopic] get into the byte buffer. My concern here though is non JVM clients not being able to support it.

          Another option is making this new format Thrift based (or protobuffer, etc) though I also appreciate and understand that adding yet another dependency yet it allows for language independence for what is now a complex data structure (not the simple 3 fields that exist now). The entire format could be a Thrift object and the serialized bytes become a single put into the byte buffer

          The last option I have considered is too basically loop through the data structures and increase the ByteBuffer position structure. So after the ack timeout we could store some hint in the byte buffer to the [] of topic -> [partition,message] e.g. we have 2 topics being published too with one of those topics having 2 partitions with a message se and the other having 3 paritions/messageet. So we could create some "hint" to know the number of topics and each of the counts of paritions/message for each topic (in this case 2,2,3) and "put" the topic1, partitionA, messageA, partitionB,messageB, topic2, partitionC, messageC, partitionD,messageD, partitionE,messageE. The draw back here is some nuance complexity (bordering on esoteric) to take the object model break it out, store it and then pulling the stored value (based on the hint so we know what position is topic and which are partition. The "hint" could be a delimited string maybe (if this is the approach that is adopted) count of topics and for each topic then the count of partition for those topics. 2,2,3 split on , [0] is the count of topics [1] is the count of partition/message for topic1 and [2] is the count of partition/message for topic 2

          might be some other options here? I am missing something/over complicating? my preference is the thrift approach but I appreciate the "hint" approach also and I would be quite alright with that too.... it works with no additive dependency drawback is a tad harder to have client code ("driver") adoption

          thoughts?

          Show
          Joe Stein added a comment - So, one possibility I am looking at for the data [<topic_data_struct>] is to use ByteArrayOutputStream and ObjectOutputStream to make the List [WiredTopic] get into the byte buffer. My concern here though is non JVM clients not being able to support it. Another option is making this new format Thrift based (or protobuffer, etc) though I also appreciate and understand that adding yet another dependency yet it allows for language independence for what is now a complex data structure (not the simple 3 fields that exist now). The entire format could be a Thrift object and the serialized bytes become a single put into the byte buffer The last option I have considered is too basically loop through the data structures and increase the ByteBuffer position structure. So after the ack timeout we could store some hint in the byte buffer to the [] of topic -> [partition,message] e.g. we have 2 topics being published too with one of those topics having 2 partitions with a message se and the other having 3 paritions/messageet. So we could create some "hint" to know the number of topics and each of the counts of paritions/message for each topic (in this case 2,2,3) and "put" the topic1, partitionA, messageA, partitionB,messageB, topic2, partitionC, messageC, partitionD,messageD, partitionE,messageE. The draw back here is some nuance complexity (bordering on esoteric) to take the object model break it out, store it and then pulling the stored value (based on the hint so we know what position is topic and which are partition. The "hint" could be a delimited string maybe (if this is the approach that is adopted) count of topics and for each topic then the count of partition for those topics. 2,2,3 split on , [0] is the count of topics [1] is the count of partition/message for topic1 and [2] is the count of partition/message for topic 2 might be some other options here? I am missing something/over complicating? my preference is the thrift approach but I appreciate the "hint" approach also and I would be quite alright with that too.... it works with no additive dependency drawback is a tad harder to have client code ("driver") adoption thoughts?
          Hide
          Joe Stein added a comment -

          I have been looking and going through the producer side so far only. I was/am scratching my head a bit on how we want to handle the arrays and if we just want to put the topic serialized (the entire thing) and just put that in a position or exactly how we want to store it from the ByteBuffer perspective.

          So in the 7 position would be ByteBuffer.wrap((data: [<topic_data_struct>]).toBytes)

          that should be it I think would be enough we could serialize/deserialize it

          If you want to go ahead and start on the consumer side I am going to give the Producer side a crack further at it tonight/tomorrow hopefully by Wednesday finished or will know what roadblock I have hit.

          Show
          Joe Stein added a comment - I have been looking and going through the producer side so far only. I was/am scratching my head a bit on how we want to handle the arrays and if we just want to put the topic serialized (the entire thing) and just put that in a position or exactly how we want to store it from the ByteBuffer perspective. So in the 7 position would be ByteBuffer.wrap((data: [<topic_data_struct>] ).toBytes) that should be it I think would be enough we could serialize/deserialize it If you want to go ahead and start on the consumer side I am going to give the Producer side a crack further at it tonight/tomorrow hopefully by Wednesday finished or will know what roadblock I have hit.
          Hide
          Prashanth Menon added a comment -

          I'm going to have to agree with Jun on this. I really like the idea of reusing the existing mirroring logic and custom tool to provide a data-bridge between a 0.7 and 0.8 cluster. Rollback becomes a non-issue and it allows us to fully revamp the write-protocol, properly constructing a new one that attempts to foresee any future modifications. It definitely demands more work and potentially requires duplicated environments and some orchestration, but considering the changes required in the wire format I think it's the appropriate solution.

          Joe, just wanted to quickly know if you'd mind splitting this task with me? I ask only because I had started some work on https://issues.apache.org/jira/browse/KAFKA-49 earlier. Producer or consumer, either is fine, your pick if you're okay with it.

          Show
          Prashanth Menon added a comment - I'm going to have to agree with Jun on this. I really like the idea of reusing the existing mirroring logic and custom tool to provide a data-bridge between a 0.7 and 0.8 cluster. Rollback becomes a non-issue and it allows us to fully revamp the write-protocol, properly constructing a new one that attempts to foresee any future modifications. It definitely demands more work and potentially requires duplicated environments and some orchestration, but considering the changes required in the wire format I think it's the appropriate solution. Joe, just wanted to quickly know if you'd mind splitting this task with me? I ask only because I had started some work on https://issues.apache.org/jira/browse/KAFKA-49 earlier. Producer or consumer, either is fine, your pick if you're okay with it.
          Hide
          Jun Rao added a comment -

          Joe,

          That's a good question. We had some discussions on this in the mailing list before.

          I agree it would be nice if we can make the 0.8 release backward compatible. However, it's a little bit hard because:
          1. We need a process to migrate the ZK data structures and potentially on-disk data organization. This is particular hard since we need to migrate a whole kafka cluster online. In the middle of the migration, some brokers will be using the old structures and some other brokers will be using the new structures. It's not clear what a consumer should behave in this stage.
          2. If something terribly wrong happens during the migration, there is no easy way to rollback the migration.

          An alternative is to make the 0.8 release non-backward compatible. This allows us to incorporate any wire/on-disk changes freely. To upgrade from 0.7 to 0.8, one possibility is to start a new 0.8 cluster. We can provide a tool that continuously mirrors data from the 0.7 cluster to the new 0.8 cluster. Once that's done, we can first upgrade the consumers and point them to the 0.8 cluster, followed by upgrading the producers and pointing them to the 0.8 cluster. This is somewhat more operational work. However, it addresses both of the above issues.

          Show
          Jun Rao added a comment - Joe, That's a good question. We had some discussions on this in the mailing list before. I agree it would be nice if we can make the 0.8 release backward compatible. However, it's a little bit hard because: 1. We need a process to migrate the ZK data structures and potentially on-disk data organization. This is particular hard since we need to migrate a whole kafka cluster online. In the middle of the migration, some brokers will be using the old structures and some other brokers will be using the new structures. It's not clear what a consumer should behave in this stage. 2. If something terribly wrong happens during the migration, there is no easy way to rollback the migration. An alternative is to make the 0.8 release non-backward compatible. This allows us to incorporate any wire/on-disk changes freely. To upgrade from 0.7 to 0.8, one possibility is to start a new 0.8 cluster. We can provide a tool that continuously mirrors data from the 0.7 cluster to the new 0.8 cluster. Once that's done, we can first upgrade the consumers and point them to the 0.8 cluster, followed by upgrading the producers and pointing them to the 0.8 cluster. This is somewhat more operational work. However, it addresses both of the above issues.
          Hide
          Joe Stein added a comment -

          so I started looking at parts of the code and the document

          there still seems to be an open question in regards to breaking change or not

          I think there is a lot of benefit to not have a breaking change granted the code will get a bit cluttered but nice that we can add the new class (i.e. WireProducerRequest) in the same file as ProducerRequest and the internal guts and works (as suggested in the wiki "One thought on this is that it is probably not too hard to make most of the above changes as new request types and map the old request types to the new")

          I do not understand the next line though "However if we are changing the request id and version id scheme then this will likely not be possible."

          keeping the old format would be helpful in upgrades and existing clients being able to operate without changes.

          assuming that (and would like clarification on "However if we are changing the request id and version id scheme then this will likely not be possible.") then we would should agree on the naming for the new extended Request/Response format (e.g. WireXYZRequest WireXYZResponse) or something else besides Wire (I don't really like that but first thing that came to me without much thinking about it) but something to help denote the internal communications from component to component and have all those classes start the same).

          Show
          Joe Stein added a comment - so I started looking at parts of the code and the document there still seems to be an open question in regards to breaking change or not I think there is a lot of benefit to not have a breaking change granted the code will get a bit cluttered but nice that we can add the new class (i.e. WireProducerRequest) in the same file as ProducerRequest and the internal guts and works (as suggested in the wiki "One thought on this is that it is probably not too hard to make most of the above changes as new request types and map the old request types to the new") I do not understand the next line though "However if we are changing the request id and version id scheme then this will likely not be possible." keeping the old format would be helpful in upgrades and existing clients being able to operate without changes. assuming that (and would like clarification on "However if we are changing the request id and version id scheme then this will likely not be possible.") then we would should agree on the naming for the new extended Request/Response format (e.g. WireXYZRequest WireXYZResponse) or something else besides Wire (I don't really like that but first thing that came to me without much thinking about it) but something to help denote the internal communications from component to component and have all those classes start the same).

            People

            • Assignee:
              Unassigned
              Reporter:
              Jun Rao
            • Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development