Kafka
  1. Kafka
  2. KAFKA-391

Producer request and response classes should use maps

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:

      Description

      Producer response contains two arrays of error codes and offsets - the ordering in these arrays correspond to the flattened ordering of the request arrays.

      It would be better to switch to maps in the request and response as this would make the code clearer and more efficient (right now, linear scans are used in handling producer acks).

      We can probably do the same in the fetch request/response.

      1. KAFKA-391-v4.patch
        125 kB
        Joel Koshy
      2. KAFKA-391-v3.patch
        115 kB
        Joel Koshy
      3. KAFKA-391-v2.patch
        79 kB
        Joel Koshy
      4. KAFKA-391-draft-r1374069.patch
        37 kB
        Joel Koshy

        Activity

        Hide
        Joel Koshy added a comment -

        Attached a draft patch. I was out of action for the last two weeks and 0.8
        has moved along quite a bit, so this will need a significant rebase. (This
        patch should cleanly apply on r1374069.) Before I spend time rebasing I was
        hoping to get some high-level feedback on this change. I think it makes the
        code clearer and likely more efficient (although an extensive perf test is
        pending).

        It is pretty straightforward and should be quick to review at least for this
        round. The main changes are as follows:

        • Switched to maps for the data/status in the producer request/response
          classes.
        • I'm using SortedMap as that helps with the serializing to the wire format.
          The maps of course are only in memory. The wire format is mostly the same
          although I modified the response format to include topic names (similar to
          the request format). This is not strictly required but I think it is
          clearer this way.
        • Made some of the related code simpler/clearer (e.g., produceToLocalLog,
          DelayedProduce.respond, etc.)
        • Minor fixes in the tests due to the above changes.
        Show
        Joel Koshy added a comment - Attached a draft patch. I was out of action for the last two weeks and 0.8 has moved along quite a bit, so this will need a significant rebase. (This patch should cleanly apply on r1374069.) Before I spend time rebasing I was hoping to get some high-level feedback on this change. I think it makes the code clearer and likely more efficient (although an extensive perf test is pending). It is pretty straightforward and should be quick to review at least for this round. The main changes are as follows: Switched to maps for the data/status in the producer request/response classes. I'm using SortedMap as that helps with the serializing to the wire format. The maps of course are only in memory. The wire format is mostly the same although I modified the response format to include topic names (similar to the request format). This is not strictly required but I think it is clearer this way. Made some of the related code simpler/clearer (e.g., produceToLocalLog, DelayedProduce.respond, etc.) Minor fixes in the tests due to the above changes.
        Hide
        Joel Koshy added a comment -

        One more comment I forgot to add above: we can/should probably get rid of the global
        error code in ProducerResponse as it is unused and does not seem to make sense
        anyway in the absence of a "generic error" code.

        Show
        Joel Koshy added a comment - One more comment I forgot to add above: we can/should probably get rid of the global error code in ProducerResponse as it is unused and does not seem to make sense anyway in the absence of a "generic error" code.
        Hide
        Jun Rao added a comment -

        Thanks for the patch. Some comments:

        1. We probably shouldn't use scala sortedMap in kafka.javaapi.ProducerRequest. On that thought, why can't ProducerRequest take a regular map in the constructor? If we want some ordering on the serialized data, we can sort the map before serialization. SortedMap seems to reveal an implementation detail that clients don't (and shouldn't) really care. Ditto for ProducerResponse.

        2. To be consistent, should we change FetchResponse (and maybe FetchRequest) to use map, instead of array too?

        3. ProducerRequest.writeTo() can use foreach like the following:
        groupedData.foreach

        { case(topic, TopciAndPartitionData) => ... }
        Show
        Jun Rao added a comment - Thanks for the patch. Some comments: 1. We probably shouldn't use scala sortedMap in kafka.javaapi.ProducerRequest. On that thought, why can't ProducerRequest take a regular map in the constructor? If we want some ordering on the serialized data, we can sort the map before serialization. SortedMap seems to reveal an implementation detail that clients don't (and shouldn't) really care. Ditto for ProducerResponse. 2. To be consistent, should we change FetchResponse (and maybe FetchRequest) to use map, instead of array too? 3. ProducerRequest.writeTo() can use foreach like the following: groupedData.foreach { case(topic, TopciAndPartitionData) => ... }
        Hide
        Joel Koshy added a comment -

        (1) Yes that's a good point. I should have thought through it more carefully. We only need the sorted property once (serialization).
        (2) I had considered this, but decided to punt to see how the producer-side came out. I'll take a stab at the fetch side as well as part of this jira.
        (3) Nice.

        So I'll rebase now and incorporate the above.

        Show
        Joel Koshy added a comment - (1) Yes that's a good point. I should have thought through it more carefully. We only need the sorted property once (serialization). (2) I had considered this, but decided to punt to see how the producer-side came out. I'll take a stab at the fetch side as well as part of this jira. (3) Nice. So I'll rebase now and incorporate the above.
        Hide
        Joel Koshy added a comment -

        (Accidentally deleted the description.)

        Show
        Joel Koshy added a comment - (Accidentally deleted the description.)
        Hide
        Jun Rao added a comment -

        Also, I agree that we can remove the global errorcode from both the fetch and the produce responses.

        Show
        Jun Rao added a comment - Also, I agree that we can remove the global errorcode from both the fetch and the produce responses.
        Hide
        Joel Koshy added a comment -

        2.1 - Rebased after the first review, but I will need to rebase again. Unit
        tests and system tests pass. This patch applies cleanly to svn revision
        1381858 - it would be great if this can be reviewed against that revision. I can provide an incremental patch if that helps after I rebase.

        2.2 - Removed all the equals methods in classes that were using Arrays earlier.

        2.3 - Switched fetch request/response to maps. It makes the code a little
        cleaner, but I think the improvement was greater with the refactoring on
        the producer side. Anyway, this makes the APIs more consistent. One small
        observation: previously, fetch requests would throw a
        FetchRequestFormatException if the TopicData array for a fetch request
        contained the same topic multiple times. Right now we don't do any checks
        since we use a map. We can add it to the FetchRequestBuilder, but not sure
        if it is required/worth it. Also, I think it previously would allow
        fetches from different offsets in the same partition in the same fetch
        request. That is no longer allowed, although I don't know why anyone would
        need that.

        2.4 - Removed the global error code.

        Another minor detail: I wonder if it would help to have a case class for
        TopicAndPartition. We use (topic, partition) and the associated tuple
        addressing all over the place. That would make the Map declarations slightly
        clearer, although now we're accustomed to understanding that (String, Int)
        must mean (topic, partitionId).

        Show
        Joel Koshy added a comment - 2.1 - Rebased after the first review, but I will need to rebase again. Unit tests and system tests pass. This patch applies cleanly to svn revision 1381858 - it would be great if this can be reviewed against that revision. I can provide an incremental patch if that helps after I rebase. 2.2 - Removed all the equals methods in classes that were using Arrays earlier. 2.3 - Switched fetch request/response to maps. It makes the code a little cleaner, but I think the improvement was greater with the refactoring on the producer side. Anyway, this makes the APIs more consistent. One small observation: previously, fetch requests would throw a FetchRequestFormatException if the TopicData array for a fetch request contained the same topic multiple times. Right now we don't do any checks since we use a map. We can add it to the FetchRequestBuilder, but not sure if it is required/worth it. Also, I think it previously would allow fetches from different offsets in the same partition in the same fetch request. That is no longer allowed, although I don't know why anyone would need that. 2.4 - Removed the global error code. Another minor detail: I wonder if it would help to have a case class for TopicAndPartition. We use (topic, partition) and the associated tuple addressing all over the place. That would make the Map declarations slightly clearer, although now we're accustomed to understanding that (String, Int) must mean (topic, partitionId).
        Hide
        Joel Koshy added a comment -

        BTW, I forgot to remove OffsetDetail from FetchRequest - we don't need that anymore.

        Show
        Joel Koshy added a comment - BTW, I forgot to remove OffsetDetail from FetchRequest - we don't need that anymore.
        Hide
        Jun Rao added a comment -

        Thanks for patch v2. Some comments.

        20. It's a good idea to create a case class of TopicPartition. This helps 2 things: (1) Tuple doesn't exist in java and javaapi.ProducerRequest currently uses tuple. (2) This avoids things like x._1._1, which is harder to understand. Similarly, should we create a case class of ProducerResponsStatus that wraps (errrocode, offset)?

        21. javaapi.ProduceRequest should use java map, instead of scala map. javaapi.SyncProducer should return a java version of ProducerResponse. Thinking about it. Should we even provide a javaapi for SyncProducer since everyone should really be using the high level Producer api? In other words, SyncProducer probably is not our public api for clients. For consumers, we likely still need to provide a java version of SimpleConsumer since there may be applications that want to control the fetch offset.

        23. It doesn't look like that we have a java version of FetchRequest. We should add that and use it in the java version of SimpleConsumer.

        24. FetchResponse: We probably should add a helper method errorCode(topic, partition)?

        25. AbstractFetchThread: can use the pattern response.data.foreach

        { case(key, partitionData) => }

        26. KafkaApis.handleFetchRequest(): can use the pattern in #25 too.

        Show
        Jun Rao added a comment - Thanks for patch v2. Some comments. 20. It's a good idea to create a case class of TopicPartition. This helps 2 things: (1) Tuple doesn't exist in java and javaapi.ProducerRequest currently uses tuple. (2) This avoids things like x._1._1, which is harder to understand. Similarly, should we create a case class of ProducerResponsStatus that wraps (errrocode, offset)? 21. javaapi.ProduceRequest should use java map, instead of scala map. javaapi.SyncProducer should return a java version of ProducerResponse. Thinking about it. Should we even provide a javaapi for SyncProducer since everyone should really be using the high level Producer api? In other words, SyncProducer probably is not our public api for clients. For consumers, we likely still need to provide a java version of SimpleConsumer since there may be applications that want to control the fetch offset. 23. It doesn't look like that we have a java version of FetchRequest. We should add that and use it in the java version of SimpleConsumer. 24. FetchResponse: We probably should add a helper method errorCode(topic, partition)? 25. AbstractFetchThread: can use the pattern response.data.foreach { case(key, partitionData) => } 26. KafkaApis.handleFetchRequest(): can use the pattern in #25 too.
        Hide
        Joel Koshy added a comment -

        Marking as blocker since I ended up changing the wire format.

        Show
        Joel Koshy added a comment - Marking as blocker since I ended up changing the wire format.
        Hide
        Joel Koshy added a comment -

        Overview of changes in v3:

        (3.1) - (20) - This actually caused bulk of the changes in v3. I did this
        for just the topic-partition pairs in producer/consumer request handling;
        same for producer response status. There are a lot more places where we
        could move from tuples to case classes. I think (as mentioned on the mailing
        list) it would be good to do this as part of cleanup but we should defer
        that for later since such changes cut across a lot of files. Going forward I
        think this brings out a convention that we might want to follow. The "scala
        book" has a reasonable guideline. I'll send out an email to kafka-dev for
        discussion and add it to our coding convention depending on how that pans
        out.

        (3.2) (21)-(23) - Thanks for catching the javaapi issue. Couple of changes
        here:
        a - Added javaapi for FetchRequest. (I needed to provide both java/non-java
        fetchrequest to the simpleconsumer since FetchBuilder returns a scala
        FetchRequest.)
        b - Java map for all the Fetch/Produce request/response in the javaapi.
        c - Removed SyncProducer: agreed that is unnecessary since Producer supports
        both sync and async; made the hadoop producer code use the high level
        producer. I think that's safe - i.e., I don't see a good reason why anyone
        would "depend" on the data going to a single broker.
        d - Got rid of the unreferenced ProducerConsumerTestHarness in javaapi.
        e - Fixed the equals method in javaapi ProducerRequest; added one to
        FetchResponse - actually we can abstract this out into a trait, but that is
        a minor improvement that I punted on.
        f - Made the ProducerRequest use Java map in javaapi.
        g - (I did not add Java versions of ProducerResponse since the SyncProducer
        has been removed.)

        (3.3) (24) - added the helper method, although I don't think I'm using it
        anywhere.

        (3.4) - Got rid of OffsetDetail.

        For (25)(26) - I tried to use the pattern wherever I could, but may have
        missed a few.

        I did not rebase this (i.e., it still applies on r1381858). I'll rebase
        after we are done with reviews.

        Show
        Joel Koshy added a comment - Overview of changes in v3: (3.1) - (20) - This actually caused bulk of the changes in v3. I did this for just the topic-partition pairs in producer/consumer request handling; same for producer response status. There are a lot more places where we could move from tuples to case classes. I think (as mentioned on the mailing list) it would be good to do this as part of cleanup but we should defer that for later since such changes cut across a lot of files. Going forward I think this brings out a convention that we might want to follow. The "scala book" has a reasonable guideline. I'll send out an email to kafka-dev for discussion and add it to our coding convention depending on how that pans out. (3.2) (21)-(23) - Thanks for catching the javaapi issue. Couple of changes here: a - Added javaapi for FetchRequest. (I needed to provide both java/non-java fetchrequest to the simpleconsumer since FetchBuilder returns a scala FetchRequest.) b - Java map for all the Fetch/Produce request/response in the javaapi. c - Removed SyncProducer: agreed that is unnecessary since Producer supports both sync and async; made the hadoop producer code use the high level producer. I think that's safe - i.e., I don't see a good reason why anyone would "depend" on the data going to a single broker. d - Got rid of the unreferenced ProducerConsumerTestHarness in javaapi. e - Fixed the equals method in javaapi ProducerRequest; added one to FetchResponse - actually we can abstract this out into a trait, but that is a minor improvement that I punted on. f - Made the ProducerRequest use Java map in javaapi. g - (I did not add Java versions of ProducerResponse since the SyncProducer has been removed.) (3.3) (24) - added the helper method, although I don't think I'm using it anywhere. (3.4) - Got rid of OffsetDetail. For (25)(26) - I tried to use the pattern wherever I could, but may have missed a few. I did not rebase this (i.e., it still applies on r1381858). I'll rebase after we are done with reviews.
        Hide
        Jun Rao added a comment -

        V3 looks good overall. Some additional comments:

        30. remove javaapi.ProducerRequest
        31. We probably should call TopicPartition TopicAndPartition.
        32. javaapi.SimpleConsumer: It's bit confusing to the scala version of send here. Let's at least explain in the comment how to use it.

        Show
        Jun Rao added a comment - V3 looks good overall. Some additional comments: 30. remove javaapi.ProducerRequest 31. We probably should call TopicPartition TopicAndPartition. 32. javaapi.SimpleConsumer: It's bit confusing to the scala version of send here. Let's at least explain in the comment how to use it.
        Hide
        Joel Koshy added a comment -

        30 - Why should it be removed?

        Show
        Joel Koshy added a comment - 30 - Why should it be removed?
        Hide
        Joel Koshy added a comment -

        Ok nm - I see it's because we removed SyncProducer and only need ProducerData. Ok - so I'll rebase now and upload the final patch in a bit.

        Show
        Joel Koshy added a comment - Ok nm - I see it's because we removed SyncProducer and only need ProducerData. Ok - so I'll rebase now and upload the final patch in a bit.
        Hide
        Joel Koshy added a comment -

        Here is the rebased patch. I also had to include a small edit to ReplicaFetcherThread to address the issue in KAFKA-517 which affects our system tests.

        Show
        Joel Koshy added a comment - Here is the rebased patch. I also had to include a small edit to ReplicaFetcherThread to address the issue in KAFKA-517 which affects our system tests.
        Hide
        Jun Rao added a comment -

        Thanks for patch v4. +1 Could you fix the following minor issues before checking in?

        41. scala version of FetchResponse: We throw an exception in errorCode if the map key doesn't exist. To be consistent, we should do the same for messageSet and highWatermark.

        42. PartitionStatus: Since this is a case class, there is no need to define requiredOffset as val.

        43. DefaultEventHandler.serialize(): This is not introduced in this patch, but could you change error("Error serializing message " + t) to error("Error serializing message ", t)

        Show
        Jun Rao added a comment - Thanks for patch v4. +1 Could you fix the following minor issues before checking in? 41. scala version of FetchResponse: We throw an exception in errorCode if the map key doesn't exist. To be consistent, we should do the same for messageSet and highWatermark. 42. PartitionStatus: Since this is a case class, there is no need to define requiredOffset as val. 43. DefaultEventHandler.serialize(): This is not introduced in this patch, but could you change error("Error serializing message " + t) to error("Error serializing message ", t)
        Hide
        Joel Koshy added a comment -

        Thanks for the review. Checked-in to 0.8 after addressing the minor issues.

        Show
        Joel Koshy added a comment - Thanks for the review. Checked-in to 0.8 after addressing the minor issues.
        Hide
        Honghai Chen added a comment -

        Why this fix add this line code?
        Sometimes got 2 responses for one request, Why fall into this situation, will there be duplicate data in Kafka?

        https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=commitdiff;h=b688c3ba045df340bc32caa40ba1909eddbcbec5
        + if (response.status.size != producerRequest.data.size)+ throw new KafkaException("Incomplete response (%s) for producer request (%s)"+ .format(response, producerRequest))

        Show
        Honghai Chen added a comment - Why this fix add this line code? Sometimes got 2 responses for one request, Why fall into this situation, will there be duplicate data in Kafka? https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=commitdiff;h=b688c3ba045df340bc32caa40ba1909eddbcbec5 + if (response.status.size != producerRequest.data.size)+ throw new KafkaException("Incomplete response (%s) for producer request (%s)"+ .format(response, producerRequest))
        Hide
        Jun Rao added a comment -

        The client should always receive one response per request. Are you see otherwise?

        Show
        Jun Rao added a comment - The client should always receive one response per request. Are you see otherwise?
        Hide
        Honghai Chen added a comment -

        Yes, after add more information to the error message , sometimes see two response for one request.

        Show
        Honghai Chen added a comment - Yes, after add more information to the error message , sometimes see two response for one request.
        Hide
        Jun Rao added a comment -

        Which version of Kafka are you using? Is that easily reproducible?

        Show
        Jun Rao added a comment - Which version of Kafka are you using? Is that easily reproducible?
        Hide
        Joel Koshy added a comment -

        You mean two responses to the caller of send? I don't see why those two lines would cause two responses. Can you explain further and provide steps to reproduce if there really is an issue? Do you see errors/warns in the logs?

        Show
        Joel Koshy added a comment - You mean two responses to the caller of send? I don't see why those two lines would cause two responses. Can you explain further and provide steps to reproduce if there really is an issue? Do you see errors/warns in the logs?
        Hide
        Honghai Chen added a comment -

        Not that two line cause two responses.
        We just hit that two line while run.
        And after change that line to output the value of response.status.size and producerRequest.data.size , we found that sometimes response.status.size=2 and producerRequest.data.size =1.

        Want to ask initially why add this line? when will response.status.size != producerRequest.data.size happen?

        Show
        Honghai Chen added a comment - Not that two line cause two responses. We just hit that two line while run. And after change that line to output the value of response.status.size and producerRequest.data.size , we found that sometimes response.status.size=2 and producerRequest.data.size =1. Want to ask initially why add this line? when will response.status.size != producerRequest.data.size happen?
        Hide
        Joel Koshy added a comment -

        Yeah I see your point. That's interesting - I actually don't remember why that was added but it appears there must have been a legitimate reason (since you ran into it ).

        Since you are able to reproduce it can you actually print the full original request and response itself? It should be in the exception that is thrown.

        Also, what is your broker Kafka version? Also, what is the version of the producer? Is it the same?

        Show
        Joel Koshy added a comment - Yeah I see your point. That's interesting - I actually don't remember why that was added but it appears there must have been a legitimate reason (since you ran into it ). Since you are able to reproduce it can you actually print the full original request and response itself? It should be in the exception that is thrown. Also, what is your broker Kafka version? Also, what is the version of the producer? Is it the same?
        Hide
        Honghai Chen added a comment -

        This situation happen under below scenario:
        one broker is leader for several partitions, for example 3, when send one messageset which has message for all of the 3 partitions of this broker , the response.status.size is 3 and the producerRequest.data.size is 1. then it hit this exception. Any idea for fix? Do we need compare response.status.size with messagesPerTopic.Count instead of producerRequest.data.size ?

        private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) = {
        if(brokerId < 0)

        { warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.map(_._1).mkString(","))) messagesPerTopic.keys.toSeq }

        else if(messagesPerTopic.size > 0) {
        val currentCorrelationId = correlationId.getAndIncrement
        val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks,
        config.requestTimeoutMs, messagesPerTopic)
        var failedTopicPartitions = Seq.empty[TopicAndPartition]
        try {
        val syncProducer = producerPool.getProducer(brokerId)
        debug("Producer sending messages with correlation id %d for topics %s to broker %d on %s:%d"
        .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port))
        val response = syncProducer.send(producerRequest)
        debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d"
        .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port))
        if(response != null) {
        if (response.status.size != producerRequest.data.size)
        throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest))

        Show
        Honghai Chen added a comment - This situation happen under below scenario: one broker is leader for several partitions, for example 3, when send one messageset which has message for all of the 3 partitions of this broker , the response.status.size is 3 and the producerRequest.data.size is 1. then it hit this exception. Any idea for fix? Do we need compare response.status.size with messagesPerTopic.Count instead of producerRequest.data.size ? private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map [TopicAndPartition, ByteBufferMessageSet] ) = { if(brokerId < 0) { warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.map(_._1).mkString(","))) messagesPerTopic.keys.toSeq } else if(messagesPerTopic.size > 0) { val currentCorrelationId = correlationId.getAndIncrement val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks, config.requestTimeoutMs, messagesPerTopic) var failedTopicPartitions = Seq.empty [TopicAndPartition] try { val syncProducer = producerPool.getProducer(brokerId) debug("Producer sending messages with correlation id %d for topics %s to broker %d on %s:%d" .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port)) val response = syncProducer.send(producerRequest) debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d" .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port)) if(response != null) { if (response.status.size != producerRequest.data.size) throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest))
        Hide
        Joel Koshy added a comment -

        If there are three partitions, then there will be three message-sets. i.e., producerRequest.data.size will be three, not one. Can you give example application code that reproduces the issue that you are seeing?

        Show
        Joel Koshy added a comment - If there are three partitions, then there will be three message-sets. i.e., producerRequest.data.size will be three, not one. Can you give example application code that reproduces the issue that you are seeing?
        Hide
        Honghai Chen added a comment -

        Many thanks for you help.
        After debugging and testing, seemly I can't hit that exception.
        Actually we're using one c# version client which is inherit from https://github.com/precog/kafka/tree/master/clients/csharp/src/Kafka/Kafka.Client , and after debug and compare it's code with java version, finally prove that it's the bug of the C# code.
        In java version, when create ProducerRequest, it set produceRequest.data as messagesPerTopic, and do group by topic just before send binary.
        But in our c# version, it group it first and set the produceRequest.data as dictionary of <Topic,Data>, so we hit this exception wrongly, we fixed it.

        Many thanks for your time.
        But anyway, can't find our related open source version from internet, our version has DefaultCallbackHandler.cs, but the version on https://github.com/precog/kafka/tree/master/clients/csharp/src/Kafka/Kafka.Client has no, so can't provide the link here.

        The java link:
        https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blob;f=core/src/main/scala/kafka/api/ProducerRequest.scala;h=570b2da1d865086f9830aa919a49063abbbe574d;hb=HEAD
        https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blob;f=core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala;h=821901e4f434dfd9eec6eceabfc2e1e65507a57c;hb=HEAD#l260

        Show
        Honghai Chen added a comment - Many thanks for you help. After debugging and testing, seemly I can't hit that exception. Actually we're using one c# version client which is inherit from https://github.com/precog/kafka/tree/master/clients/csharp/src/Kafka/Kafka.Client , and after debug and compare it's code with java version, finally prove that it's the bug of the C# code. In java version, when create ProducerRequest, it set produceRequest.data as messagesPerTopic, and do group by topic just before send binary. But in our c# version, it group it first and set the produceRequest.data as dictionary of <Topic,Data>, so we hit this exception wrongly, we fixed it. Many thanks for your time. But anyway, can't find our related open source version from internet, our version has DefaultCallbackHandler.cs, but the version on https://github.com/precog/kafka/tree/master/clients/csharp/src/Kafka/Kafka.Client has no, so can't provide the link here. The java link: https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blob;f=core/src/main/scala/kafka/api/ProducerRequest.scala;h=570b2da1d865086f9830aa919a49063abbbe574d;hb=HEAD https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blob;f=core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala;h=821901e4f434dfd9eec6eceabfc2e1e65507a57c;hb=HEAD#l260

          People

          • Assignee:
            Joel Koshy
            Reporter:
            Joel Koshy
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development