Kafka
  1. Kafka
  2. KAFKA-49

Add acknowledgement to the produce request.

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:
      None

      Description

      Currently, the produce request doesn't get acknowledged. We need to have a broker send a response to the producer and have the producer wait for the response before sending the next request.

      1. KAFKA-49-v3.patch
        48 kB
        Prashanth Menon
      2. KAFKA-49-v2.patch
        49 kB
        Prashanth Menon
      3. KAFKA-49-v1.patch
        46 kB
        Prashanth Menon
      4. KAFKA-49-continued-v2.patch
        35 kB
        Prashanth Menon
      5. KAFKA-49-continued.patch
        35 kB
        Prashanth Menon

        Issue Links

          Activity

          Hide
          Jay Kreps added a comment -

          If there are any other producer-related changes coming we should try to batch these together at the same time to avoid having to update clients too often.

          Show
          Jay Kreps added a comment - If there are any other producer-related changes coming we should try to batch these together at the same time to avoid having to update clients too often.
          Hide
          Prashanth Menon added a comment -

          So I've started a little work on this. Looks to me like the ProducerRequest is going to need an additional "acknowledge" boolean field (default false) which we send along with the rest of the fields. On the producer side, there are a couple of options:

          1. We can leave the producer API and have producers (both sync and async) acknowledge all or non of their messages. This behaviour will be driven by a configuration field or a class parameter.
          2. We add the acknowledgement parameter (default false) to the producer send API for and leave the remaining behaviour the same. The producer then handles waiting or not waiting for acks on a per-message case.

          I would prefer the second option as it's simple to do and gives the option to clients. Waiting on the producing end shouldn't be an issue. On the broker side, it becomes easy as we just send a boolean response after handling the request (ditto for the multi-produce request).

          Did anyone else have any thoughts on this?

          Show
          Prashanth Menon added a comment - So I've started a little work on this. Looks to me like the ProducerRequest is going to need an additional "acknowledge" boolean field (default false) which we send along with the rest of the fields. On the producer side, there are a couple of options: 1. We can leave the producer API and have producers (both sync and async) acknowledge all or non of their messages. This behaviour will be driven by a configuration field or a class parameter. 2. We add the acknowledgement parameter (default false) to the producer send API for and leave the remaining behaviour the same. The producer then handles waiting or not waiting for acks on a per-message case. I would prefer the second option as it's simple to do and gives the option to clients. Waiting on the producing end shouldn't be an issue. On the broker side, it becomes easy as we just send a boolean response after handling the request (ditto for the multi-produce request). Did anyone else have any thoughts on this?
          Hide
          Jun Rao added a comment -

          Prshanth, thanks for getting started on this. I agree with your second approach. Basically, add a new parameter in SyncProducer.send/multisend to indicate whether an ack is needed or not. The high level producer can then set that parameter based on ProducerConfig (probably true for sync mode and false for async mode).

          Another question is what kind of ack does the broker send back. A simple approach is to send back a boolean. Another possibility is to return for each partition in the produce request, the latest offset after the request is served. Some clients could potentially make use of the returned offset.

          Show
          Jun Rao added a comment - Prshanth, thanks for getting started on this. I agree with your second approach. Basically, add a new parameter in SyncProducer.send/multisend to indicate whether an ack is needed or not. The high level producer can then set that parameter based on ProducerConfig (probably true for sync mode and false for async mode). Another question is what kind of ack does the broker send back. A simple approach is to send back a boolean. Another possibility is to return for each partition in the produce request, the latest offset after the request is served. Some clients could potentially make use of the returned offset.
          Hide
          Taylor Gautier added a comment -

          I second returning the new offset.

          Show
          Taylor Gautier added a comment - I second returning the new offset.
          Hide
          Jay Kreps added a comment -

          +1 for returning the offset

          If we are changing the request format it would also be good to think it through in some detail to get it right since these kinds of API changes are harder to rollout then to make. Some questions:
          1. Are we trying to maintain compatibility for this change? If so we should bump up the request id number and ignore the new fields for the old request id. This is not too hard, but requires a little extra work.
          2. Currently we have ProduceRequest and MultiProducerRequest and FetchRequest and MultiFetchRequest. I recommend we rename MultiProducerRequest to ProduceRequest and delete the existing ProduceRequest. The current ProduceRequest has no advantages over MultiProducerRequest and having both means each change we make has to be done for both. The two variations are just there for historical reasons-originally there was no multi* version of the requests and we added that later. I recommend we do the same for the FetchRequest/MultiFetchRequest. This will make our lives simpler going forward.
          3. Both of the MultiProducerRequest has the format [(topic, partition, messages), (topic, partition, messages), ...]. This is because it is just a bunch of repeated ProducerRequests. This is really inefficient, though, as a common case is that we are producing a bunch of messages for different partitions under the same topic (i.e. if we are doing the key-based partitioning). It would be better for the format to be [(topic, [(partition, messages), ...], topic, [(partition, messages), ...], ...]. This would mean that each topic is only given once.

          I would like to get a quick consensus on the desired format of the produce and fetch requests up front, then we can break this into appropriate sub tasks so we don't expand the scope of Prasanth's work too much.

          Show
          Jay Kreps added a comment - +1 for returning the offset If we are changing the request format it would also be good to think it through in some detail to get it right since these kinds of API changes are harder to rollout then to make. Some questions: 1. Are we trying to maintain compatibility for this change? If so we should bump up the request id number and ignore the new fields for the old request id. This is not too hard, but requires a little extra work. 2. Currently we have ProduceRequest and MultiProducerRequest and FetchRequest and MultiFetchRequest. I recommend we rename MultiProducerRequest to ProduceRequest and delete the existing ProduceRequest. The current ProduceRequest has no advantages over MultiProducerRequest and having both means each change we make has to be done for both. The two variations are just there for historical reasons- originally there was no multi * version of the requests and we added that later. I recommend we do the same for the FetchRequest/MultiFetchRequest. This will make our lives simpler going forward. 3. Both of the MultiProducerRequest has the format [(topic, partition, messages), (topic, partition, messages), ...] . This is because it is just a bunch of repeated ProducerRequests. This is really inefficient, though, as a common case is that we are producing a bunch of messages for different partitions under the same topic (i.e. if we are doing the key-based partitioning). It would be better for the format to be [(topic, [(partition, messages), ...] , topic, [(partition, messages), ...] , ...]. This would mean that each topic is only given once. I would like to get a quick consensus on the desired format of the produce and fetch requests up front, then we can break this into appropriate sub tasks so we don't expand the scope of Prasanth's work too much.
          Hide
          Jun Rao added a comment -

          Both 2 and 3 make sense.

          If we do this in a bug fix release in 0.7, we probably need to maintain backward compatibility. If we do this as part of the replication work, we probably can make a non-backward compatible change. My preference is the latter.

          Show
          Jun Rao added a comment - Both 2 and 3 make sense. If we do this in a bug fix release in 0.7, we probably need to maintain backward compatibility. If we do this as part of the replication work, we probably can make a non-backward compatible change. My preference is the latter.
          Hide
          Taylor Gautier added a comment -

          Generally I think it's a good idea to have a version embedded into the protocol. This allows for backwards and forwards compatibility. In a sense, the request id works as such, so in some sense it's a matter of semantics, but the only way to identify that there are multiple versions of the same request is to have some kind of external mapping that says id 2 and id 8 are really the same request, just different versions.

          OTOH, if you use a version, you can then have:

          id 2 version 1
          id 2 veriosn 2

          etc. and this is imho easier to manage.

          Usually, the version should in fact be the first value in the protocol, so that you never have formatting issues that lie outside the realm of the versioned data.

          Currently, all requests are preceded by a header, which contains the length of the data. This is where I would start, we should either strive for:

          version: 2 bytes
          length: 4 bytes

          or

          length: 4 bytes
          version: 2 bytes

          Note that the message request already has a way to represent versions, using the magic field, but honestly I find it a little bit non explicit for my taste.

          I would also include a 64-bit "flags" area that will allow for future flags to be set to indicate various things.

          So, if I were to suggest a standard header for requests and responses it would look like:

          length: 2 bytes
          version: 2 bytes
          reuest id: 2 bytes
          flags: 4 bytes
          payload: n bytes

          Show
          Taylor Gautier added a comment - Generally I think it's a good idea to have a version embedded into the protocol. This allows for backwards and forwards compatibility. In a sense, the request id works as such, so in some sense it's a matter of semantics, but the only way to identify that there are multiple versions of the same request is to have some kind of external mapping that says id 2 and id 8 are really the same request, just different versions. OTOH, if you use a version, you can then have: id 2 version 1 id 2 veriosn 2 etc. and this is imho easier to manage. Usually, the version should in fact be the first value in the protocol, so that you never have formatting issues that lie outside the realm of the versioned data. Currently, all requests are preceded by a header, which contains the length of the data. This is where I would start, we should either strive for: version: 2 bytes length: 4 bytes or length: 4 bytes version: 2 bytes Note that the message request already has a way to represent versions, using the magic field, but honestly I find it a little bit non explicit for my taste. I would also include a 64-bit "flags" area that will allow for future flags to be set to indicate various things. So, if I were to suggest a standard header for requests and responses it would look like: length: 2 bytes version: 2 bytes reuest id: 2 bytes flags: 4 bytes payload: n bytes
          Hide
          Taylor Gautier added a comment -

          It could be possible to split the current request id - 2 bytes - into a version and an id field one byte long each. Assuming there's not much need for a vocabulary greater than 256 verbs, and versions > 256, this would probably work within the current binary protocol and give backwards compatibility to 0.6 and 0.7 clients...

          Show
          Taylor Gautier added a comment - It could be possible to split the current request id - 2 bytes - into a version and an id field one byte long each. Assuming there's not much need for a vocabulary greater than 256 verbs, and versions > 256, this would probably work within the current binary protocol and give backwards compatibility to 0.6 and 0.7 clients...
          Hide
          Jay Kreps added a comment -

          Hi Taylor, as you say the request id was meant to be the version. However in retrospect I do think I like the idea of separating the request and the version of the request. I agree it would be nice to split this.

          I think the open question here is whether we should try to maintain backwards compatibility for the next major release. It would probably be very convenient for us at code-writing time not to, but is more painful at rollout time.

          Show
          Jay Kreps added a comment - Hi Taylor, as you say the request id was meant to be the version. However in retrospect I do think I like the idea of separating the request and the version of the request. I agree it would be nice to split this. I think the open question here is whether we should try to maintain backwards compatibility for the next major release. It would probably be very convenient for us at code-writing time not to, but is more painful at rollout time.
          Hide
          Taylor Gautier added a comment -

          Well, how many versions do you think you want? Maybe we could split the request field up into say the first 5 or 6 bits instead of 8 for the versions.

          Show
          Taylor Gautier added a comment - Well, how many versions do you think you want? Maybe we could split the request field up into say the first 5 or 6 bits instead of 8 for the versions.
          Hide
          Jay Kreps added a comment -

          So guys, my thought is the best plan of attack would be fully think through the protocol changes for all the use cases we currently know about and do those all at once (even if the features the new fields support aren't yet available). This will avoid doing this in lots of little patches that all conflict, and it will make us think things through holistically.

          I created a wiki with some of the outstanding ideas, I am going to move this discussion to the main dev and user lists to get broader feedback. It would be good if people could give their thoughts there so we can try to get these changes right.

          Show
          Jay Kreps added a comment - So guys, my thought is the best plan of attack would be fully think through the protocol changes for all the use cases we currently know about and do those all at once (even if the features the new fields support aren't yet available). This will avoid doing this in lots of little patches that all conflict, and it will make us think things through holistically. I created a wiki with some of the outstanding ideas, I am going to move this discussion to the main dev and user lists to get broader feedback. It would be good if people could give their thoughts there so we can try to get these changes right.
          Hide
          Neha Narkhede added a comment -

          KAFKA-240 implements the new wire format for producer and consumer. Since this JIRA requires the new format, it depends on KAFKA-240

          Show
          Neha Narkhede added a comment - KAFKA-240 implements the new wire format for producer and consumer. Since this JIRA requires the new format, it depends on KAFKA-240
          Hide
          Neha Narkhede added a comment - - edited

          KAFKA-239 is complete and KAFKA-240 is almost there.

          Prashanth, in your comment above, you've mentioned you've started work on this. If so, mind assigning this JIRA to yourself ? This looks like the next JIRA to work on after KAFKA-240 is in. (https://issues.apache.org/jira/browse/KAFKA-50?focusedCommentId=13180712&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13180712)

          Show
          Neha Narkhede added a comment - - edited KAFKA-239 is complete and KAFKA-240 is almost there. Prashanth, in your comment above, you've mentioned you've started work on this. If so, mind assigning this JIRA to yourself ? This looks like the next JIRA to work on after KAFKA-240 is in. ( https://issues.apache.org/jira/browse/KAFKA-50?focusedCommentId=13180712&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13180712 )
          Hide
          Prashanth Menon added a comment -

          Sure, but I actually can't change assignments. Could be a privileges issue?

          Show
          Prashanth Menon added a comment - Sure, but I actually can't change assignments. Could be a privileges issue?
          Hide
          Neha Narkhede added a comment -

          Looks like you didn't exist in the Apache Kafka JIRA list. Added you there, also assigned this JIRA to you

          Show
          Neha Narkhede added a comment - Looks like you didn't exist in the Apache Kafka JIRA list. Added you there, also assigned this JIRA to you
          Hide
          Prashanth Menon added a comment -

          Hey all. Getting the acknowledgements done was fairly straightforward - I've "todo-ed" actually waiting for replica acknowledgements as part of kafka-44 when it introduces true replicas and partitions. Having the sync producer wait for a response when it requires acknowledgements was trivial. My question is what to do with the async producer. Was the intent to perform some logic in the default event handler, or to expect clients to write custom event handlers that deal errors? Should we bubble up the response to the producer level?

          Show
          Prashanth Menon added a comment - Hey all. Getting the acknowledgements done was fairly straightforward - I've "todo-ed" actually waiting for replica acknowledgements as part of kafka-44 when it introduces true replicas and partitions. Having the sync producer wait for a response when it requires acknowledgements was trivial. My question is what to do with the async producer. Was the intent to perform some logic in the default event handler, or to expect clients to write custom event handlers that deal errors? Should we bubble up the response to the producer level?
          Hide
          Jun Rao added a comment -

          Prashanth,

          That's a good question. Initially, I was just thinking that for async producers, if we get an error during send (after retries), we will just log the error without telling the client. Currently, the event handler is not really extensible. I can image that we add some kind of callback to return those errors. The question is what will the client do on those errors. Will it resend? If so, we will need to pass the failed requests through callback too. I am curious about how other messaging systems like activeMQ do in the async mode.

          Show
          Jun Rao added a comment - Prashanth, That's a good question. Initially, I was just thinking that for async producers, if we get an error during send (after retries), we will just log the error without telling the client. Currently, the event handler is not really extensible. I can image that we add some kind of callback to return those errors. The question is what will the client do on those errors. Will it resend? If so, we will need to pass the failed requests through callback too. I am curious about how other messaging systems like activeMQ do in the async mode.
          Hide
          Prashanth Menon added a comment - - edited

          If I remember right, HornetQ allows you to implement a callback interface to be notified of message acknowledgments. We could do something similar, passing back the request and response for the erroneous parts. To your second point, I suppose it depends on the error. Some may be logged and skipped, some will require a refresh of topic metadata . The default behaviour should cover the base cases.

          Curious to hear other thoughts

          Show
          Prashanth Menon added a comment - - edited If I remember right, HornetQ allows you to implement a callback interface to be notified of message acknowledgments. We could do something similar, passing back the request and response for the erroneous parts. To your second point, I suppose it depends on the error. Some may be logged and skipped, some will require a refresh of topic metadata . The default behaviour should cover the base cases. Curious to hear other thoughts
          Hide
          Jun Rao added a comment -

          The current defaulteventhandler already refreshes topic metadata on retries. So, if we return any failed request to the client, there is probably not much the client can do, except for logging it. In any case, we should use a separate jira to track if we need any aysnc callback on the producer side.

          Show
          Jun Rao added a comment - The current defaulteventhandler already refreshes topic metadata on retries. So, if we return any failed request to the client, there is probably not much the client can do, except for logging it. In any case, we should use a separate jira to track if we need any aysnc callback on the producer side.
          Hide
          Prashanth Menon added a comment -

          Sounds good to me. Doing some additional work on DefaultEventHandler, I noticed something off in the retry logic that I'd like to get confirmed. Consider the case where I've got data destined for more than one broker, say three.

          • Enter handleSerializedData()
          • Partioning and collating makes a map with three key/value pairs (broker -> topic partition data and messages).
          • Enter for loop
          • Assume the first send works on the first try for broker #1.
          • Next iteration, the second send to broker #2 fails on the first try, we fall into the retry loop and recursive into handleSerializedData with requiredRetries = 0.
          • In handleSerializedData
          • This time, the partitioned data will one one key/value pair for the single broker (broker #2) we're attempting to resend data to.
          • Enter for loop
          • Attempt to send data to broker #2, the send succeeds
          • We exhaust the map entries and the for loop condition.
          • We return to the retry loop for retry=1 on broker #2 in the catch block.
          • The previous send succeeded on first try and now there's the "return" statement. This exists the function, but we have one more broker (broker #3) to send data to.

          Does the flow sound about right? I think what needs to happen is to set a flag and break the retry while loop after a successful retry. Then we check the flag after the loop and either throw the exception or continue the outer for loop.

          Am I crazy? Am I missing something in my sleep-deprived state here?

          Show
          Prashanth Menon added a comment - Sounds good to me. Doing some additional work on DefaultEventHandler, I noticed something off in the retry logic that I'd like to get confirmed. Consider the case where I've got data destined for more than one broker, say three. Enter handleSerializedData() Partioning and collating makes a map with three key/value pairs (broker -> topic partition data and messages). Enter for loop Assume the first send works on the first try for broker #1. Next iteration, the second send to broker #2 fails on the first try, we fall into the retry loop and recursive into handleSerializedData with requiredRetries = 0. In handleSerializedData This time, the partitioned data will one one key/value pair for the single broker (broker #2) we're attempting to resend data to. Enter for loop Attempt to send data to broker #2, the send succeeds We exhaust the map entries and the for loop condition. We return to the retry loop for retry=1 on broker #2 in the catch block. The previous send succeeded on first try and now there's the "return" statement. This exists the function, but we have one more broker (broker #3) to send data to. Does the flow sound about right? I think what needs to happen is to set a flag and break the retry while loop after a successful retry. Then we check the flag after the loop and either throw the exception or continue the outer for loop. Am I crazy? Am I missing something in my sleep-deprived state here?
          Hide
          Jun Rao added a comment -

          Prashanth,

          Yes, that's actually a real bug. Instead of returning in retry, we should just set a boolean to indicate that a send has succeeded. At the end, we will throw FailedToSendMessageException if the boolean is not set. Otherwise, we will continue with the for loop. Could you file a separate jira to fix that? Thanks for catching the bug.

          Show
          Jun Rao added a comment - Prashanth, Yes, that's actually a real bug. Instead of returning in retry, we should just set a boolean to indicate that a send has succeeded. At the end, we will throw FailedToSendMessageException if the boolean is not set. Otherwise, we will continue with the for loop. Could you file a separate jira to fix that? Thanks for catching the bug.
          Hide
          Prashanth Menon added a comment -

          Done, created KAFKA-295. Expect patch for this jira later tonight.

          Show
          Prashanth Menon added a comment - Done, created KAFKA-295 . Expect patch for this jira later tonight.
          Hide
          Prashanth Menon added a comment -

          I've attached a patch for this. A few comments:

          • Modified ProducerResponse
          • Broker does not actually wait for replica ACKS. This will be done with KAFKA-44.
          • Sync producer has been modified to wait for response from broker. Async producer isn't aware of request level errors, this will require a separate ticket.
          • Some general cleanup on producer request, async producer and removal of MultiProduce request key.

          One oddity is since we use Arrays in the request and response, it breaks the case class equality/hashcode logic since Java's arrays are broken. We should probably make them Seq's (separate JIRA?) or WrappedArrays.

          Show
          Prashanth Menon added a comment - I've attached a patch for this. A few comments: Modified ProducerResponse Broker does not actually wait for replica ACKS. This will be done with KAFKA-44 . Sync producer has been modified to wait for response from broker. Async producer isn't aware of request level errors, this will require a separate ticket. Some general cleanup on producer request, async producer and removal of MultiProduce request key. One oddity is since we use Arrays in the request and response, it breaks the case class equality/hashcode logic since Java's arrays are broken. We should probably make them Seq's (separate JIRA?) or WrappedArrays.
          Hide
          Jun Rao added a comment -

          Prashanth,

          Thanks for the patch. Overall, it looks pretty good. Some comments:
          1. KafkaApis: Even when the produce request requires no ack, we will still need to send error code back. So we should always send a ProduceResponse. When no ack is specified, we probably don't need to send offsets back.
          2. ProduceRequest.getNumMessages: rename it to something like getNumTopicPartitions
          3. AsyncProducerTeest.testDefaultHanlderRetryLogic: doesn't really test retry
          4. SyncProducerTest.testProduceBlocksWhenRequired: Use createTopic instead of creating ZK path directly.
          5. I agree that it's probably better to use Seq in our requests/response, instead of Array. Then we need a java version to convert Seq to java array and vice versa. Please open a separate jira to track this.

          Show
          Jun Rao added a comment - Prashanth, Thanks for the patch. Overall, it looks pretty good. Some comments: 1. KafkaApis: Even when the produce request requires no ack, we will still need to send error code back. So we should always send a ProduceResponse. When no ack is specified, we probably don't need to send offsets back. 2. ProduceRequest.getNumMessages: rename it to something like getNumTopicPartitions 3. AsyncProducerTeest.testDefaultHanlderRetryLogic: doesn't really test retry 4. SyncProducerTest.testProduceBlocksWhenRequired: Use createTopic instead of creating ZK path directly. 5. I agree that it's probably better to use Seq in our requests/response, instead of Array. Then we need a java version to convert Seq to java array and vice versa. Please open a separate jira to track this.
          Hide
          Prashanth Menon added a comment -

          Thanks for the pointers!

          1. Hmmm, do you propose returning an empty offsets array back to the client when no ack is required? That seems perfectly reasonable since the broker can't make guarantees as to the offsets; but it does feel somehow incongrous since one would expect the errors and offsets array sizes to be equal. I'm fine with the idea as long as it's agreed in the wire format. If I've completely missed the point, forgive me!
          2. Done.
          3. Done. Wow, that wasn't supposed to be included. It was part of my sanity check for the incorrect retry logic I mentioned earlier.
          4. Done.
          5. Done.

          Show
          Prashanth Menon added a comment - Thanks for the pointers! 1. Hmmm, do you propose returning an empty offsets array back to the client when no ack is required? That seems perfectly reasonable since the broker can't make guarantees as to the offsets; but it does feel somehow incongrous since one would expect the errors and offsets array sizes to be equal. I'm fine with the idea as long as it's agreed in the wire format. If I've completely missed the point, forgive me! 2. Done. 3. Done. Wow, that wasn't supposed to be included. It was part of my sanity check for the incorrect retry logic I mentioned earlier. 4. Done. 5. Done.
          Hide
          Jun Rao added a comment -

          1. Or we can just treat noacks the same as act=1 for now.

          Show
          Jun Rao added a comment - 1. Or we can just treat noacks the same as act=1 for now.
          Hide
          Prashanth Menon added a comment - - edited

          A few more concerns popped up as a result of making the send in syncproducer blocking.

          1. Edit: So it turns out that using the channel in SyncProducer like we are to perform reads won't trigger socket timeouts though we set it and will block forever which is bad news(check http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround). The latter post has a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads. The other option being using non-blocking sockets with selectors which is more complex.
          2. It is conceivable that a broker listed in the replica set /brokers/topics/[topic]/partitions/[partition]/replicas is offline or shutdown which means their ephemeral entries are missing in ZK. A problem then arises when an active broker attempts to pull metadata and node information for topics these brokers host since AdminUtils assumes any broker in AR or ISR must have paths/info in ZK /brokers/ids/[brokerId], but since they don't an NoNodeException is thrown. A corner case for sure, but something that should probably be fixed.

          Show
          Prashanth Menon added a comment - - edited A few more concerns popped up as a result of making the send in syncproducer blocking. 1. Edit: So it turns out that using the channel in SyncProducer like we are to perform reads won't trigger socket timeouts though we set it and will block forever which is bad news(check http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround). The latter post has a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads. The other option being using non-blocking sockets with selectors which is more complex. 2. It is conceivable that a broker listed in the replica set /brokers/topics/ [topic] /partitions/ [partition] /replicas is offline or shutdown which means their ephemeral entries are missing in ZK. A problem then arises when an active broker attempts to pull metadata and node information for topics these brokers host since AdminUtils assumes any broker in AR or ISR must have paths/info in ZK /brokers/ids/ [brokerId] , but since they don't an NoNodeException is thrown. A corner case for sure, but something that should probably be fixed.
          Hide
          Prashanth Menon added a comment -

          I've attached an updated patch.

          1. ProduceRequest always receives a ProducerResponse. If acks=0, offsets in the response are treated as if acks=1, meaning only the leader has acked.
          2. SyncProducer blocks waiting for a response from a ProducerRequest.
          3. I've commented out ProducerTest.testZKSendWithDeadBroker since it relies on SyncProducer logic that will need to change. It also will need to be rewritten with the changes coming in as part of KAFKA-45 leader election logic.

          Let me know if I've missed anything!

          Show
          Prashanth Menon added a comment - I've attached an updated patch. 1. ProduceRequest always receives a ProducerResponse. If acks=0, offsets in the response are treated as if acks=1, meaning only the leader has acked. 2. SyncProducer blocks waiting for a response from a ProducerRequest. 3. I've commented out ProducerTest.testZKSendWithDeadBroker since it relies on SyncProducer logic that will need to change. It also will need to be rewritten with the changes coming in as part of KAFKA-45 leader election logic. Let me know if I've missed anything!
          Hide
          Jun Rao added a comment -

          Prashanth,

          I am ready to commit your patch. A couple things:

          1. Your patch added a new class ResponseHandler, but it's not used. Should that be removed?
          2. Your concern #1 is valid. Could you create another jira to track this?
          3. For your concern #2, it's ok for getMetadataApi to return empty leader in a transition state. The client will simply backoff a bit and call getMetadataApi again.

          Show
          Jun Rao added a comment - Prashanth, I am ready to commit your patch. A couple things: 1. Your patch added a new class ResponseHandler, but it's not used. Should that be removed? 2. Your concern #1 is valid. Could you create another jira to track this? 3. For your concern #2, it's ok for getMetadataApi to return empty leader in a transition state. The client will simply backoff a bit and call getMetadataApi again.
          Hide
          Prashanth Menon added a comment -

          Hi Jun, I've attached a new patch.

          1. Yes, I've removed it.
          2. Done.
          3. You are correct in the case of leaders, but I believe the problem stands when pulling topic metadata with atleast one offline broker listed in the assigned replicas.

          Show
          Prashanth Menon added a comment - Hi Jun, I've attached a new patch. 1. Yes, I've removed it. 2. Done. 3. You are correct in the case of leaders, but I believe the problem stands when pulling topic metadata with atleast one offline broker listed in the assigned replicas.
          Hide
          Jun Rao added a comment -

          Thanks for the patch Prashanth. Just committed to 0.7 branch.

          For 3, if a broker is offline, then eventually it will not be in ISR. In the transition state, we could have an ISR broker without matching host and port.

          Show
          Jun Rao added a comment - Thanks for the patch Prashanth. Just committed to 0.7 branch. For 3, if a broker is offline, then eventually it will not be in ISR. In the transition state, we could have an ISR broker without matching host and port.
          Hide
          Neha Narkhede added a comment -

          Sorry for visiting this late. I have a few questions about producer ACK.

          1. In DefaultEventHandler, the producer ACK is not used. Shouldn't it be used to figure out if the send operation needs to be retried ?
          2. In DefaultEventHandler, should the producer wait for ACK and timeout if it doesn't receive one ?
          3. In KafkaApis, why doesn't the broker send an error back to the producer if it received a producer request for a partition that is not hosted on that broker ?

          Show
          Neha Narkhede added a comment - Sorry for visiting this late. I have a few questions about producer ACK. 1. In DefaultEventHandler, the producer ACK is not used. Shouldn't it be used to figure out if the send operation needs to be retried ? 2. In DefaultEventHandler, should the producer wait for ACK and timeout if it doesn't receive one ? 3. In KafkaApis, why doesn't the broker send an error back to the producer if it received a producer request for a partition that is not hosted on that broker ?
          Hide
          Prashanth Menon added a comment -

          Better late than never. A second review is always a plus! To your points:

          1. Absolutely, this was overlooked. Expect patch later tonight or tomorrow.
          2. The DefaultEventHandler does wait for the ack by waiting for the response. Unfortunately, the current SyncProduer doesn't timeout correctly for which KAFKA-305 was created.
          3. KafkaApis does not explicitly do the check, instead relying on LogManager which currently does. It makes sense to move that piece of logic along with the TODO from LogManager into KafkaApis for clarity and to separate ZK from LogManager.

          What do you think?

          Show
          Prashanth Menon added a comment - Better late than never. A second review is always a plus! To your points: 1. Absolutely, this was overlooked. Expect patch later tonight or tomorrow. 2. The DefaultEventHandler does wait for the ack by waiting for the response. Unfortunately, the current SyncProduer doesn't timeout correctly for which KAFKA-305 was created. 3. KafkaApis does not explicitly do the check, instead relying on LogManager which currently does. It makes sense to move that piece of logic along with the TODO from LogManager into KafkaApis for clarity and to separate ZK from LogManager. What do you think?
          Hide
          Neha Narkhede added a comment -

          1. I'm refactoring some part of DefaultEventHandler as part of KAFKA-300. I'll upload a patch tonight. You can either choose to work off of the changed code or not. Your call.
          2. Sounds good
          3. In handleProduceRequest, the logManager.append() throws InvalidPartitionException when it receives a request for a partition that it does not own. Does it make sense to send an ACK to the producer with an error code like NotLeaderForPartitionException ?

          Show
          Neha Narkhede added a comment - 1. I'm refactoring some part of DefaultEventHandler as part of KAFKA-300 . I'll upload a patch tonight. You can either choose to work off of the changed code or not. Your call. 2. Sounds good 3. In handleProduceRequest, the logManager.append() throws InvalidPartitionException when it receives a request for a partition that it does not own. Does it make sense to send an ACK to the producer with an error code like NotLeaderForPartitionException ?
          Hide
          Neha Narkhede added a comment -

          Reopening this issue to address some review suggestions and to fix KAFKA-305 as part of this JIRA

          Show
          Neha Narkhede added a comment - Reopening this issue to address some review suggestions and to fix KAFKA-305 as part of this JIRA
          Hide
          Neha Narkhede added a comment -

          Prashanth, thanks for resolving KAFKA-305 ! Would you be up for finishing up the remaining work on this ? It seems like a good idea to complete earlier JIRAs, before moving to the later ones.

          Show
          Neha Narkhede added a comment - Prashanth, thanks for resolving KAFKA-305 ! Would you be up for finishing up the remaining work on this ? It seems like a good idea to complete earlier JIRAs, before moving to the later ones.
          Hide
          Prashanth Menon added a comment -

          Okay, I've attached a patch that should take care of the outstanding items. A couple of points:

          1. KafkaApis not checks whether a partition has a leader on the broker. It uses KafkaZooKeeper to check this, for now, but should probably rely on ReplicaManager and the Replica itself to determine this. KAFKA-46 should take care of this.
          2. I've removed the random partition check on the server-side since the partitioning is done in default event handler. Producers should know which broker a partition belongs to.
          3. I've added a new NotLeaderForPartitionException and added it to ErrorMapping so clients can receive it.
          4. DefaultEventHandler.send now returns a list of topic/partition tuples that represents those messages that need to be resent due to an error.
          5. Due to the changes in KafkaApis produce check, some of the tests have been modified to ensure topics are in ZK and to wait for leadership.

          I think that covers all, please point out any issues!

          Show
          Prashanth Menon added a comment - Okay, I've attached a patch that should take care of the outstanding items. A couple of points: 1. KafkaApis not checks whether a partition has a leader on the broker. It uses KafkaZooKeeper to check this, for now, but should probably rely on ReplicaManager and the Replica itself to determine this. KAFKA-46 should take care of this. 2. I've removed the random partition check on the server-side since the partitioning is done in default event handler. Producers should know which broker a partition belongs to. 3. I've added a new NotLeaderForPartitionException and added it to ErrorMapping so clients can receive it. 4. DefaultEventHandler.send now returns a list of topic/partition tuples that represents those messages that need to be resent due to an error. 5. Due to the changes in KafkaApis produce check, some of the tests have been modified to ensure topics are in ZK and to wait for leadership. I think that covers all, please point out any issues!
          Hide
          Neha Narkhede added a comment -

          +1. Thanks for incorporating the review suggestions !

          Show
          Neha Narkhede added a comment - +1. Thanks for incorporating the review suggestions !
          Hide
          Jun Rao added a comment -

          Prashanth,

          Patch looks good: Just one minor thing.

          6. Unused imports: KafkaZookeeper

          While looking at the patch, I realized that there are a couple of other things that we will need to follow up.

          a. In DefaultEventHandler, it seems that we rely on the fact that broker.id is a non-negative integer. However, we don't enforce that in broker startup.
          b. With the create topic ddl, some of the broker configs like topic.partition.count.map probably don't make sense anymore.

          I will create a jira for each item to follow up.

          Show
          Jun Rao added a comment - Prashanth, Patch looks good: Just one minor thing. 6. Unused imports: KafkaZookeeper While looking at the patch, I realized that there are a couple of other things that we will need to follow up. a. In DefaultEventHandler, it seems that we rely on the fact that broker.id is a non-negative integer. However, we don't enforce that in broker startup. b. With the create topic ddl, some of the broker configs like topic.partition.count.map probably don't make sense anymore. I will create a jira for each item to follow up.
          Hide
          Prashanth Menon added a comment -

          Thanks for the review! I've attached newest patch for #6 resolved.

          Show
          Prashanth Menon added a comment - Thanks for the review! I've attached newest patch for #6 resolved.
          Hide
          Jun Rao added a comment -

          Prashanth,

          Thanks for the patch. It seems that kafka-48 is almost ready. Since that's a relatively large patch, I will commit your patch after kafka-48 is committed.

          Show
          Jun Rao added a comment - Prashanth, Thanks for the patch. It seems that kafka-48 is almost ready. Since that's a relatively large patch, I will commit your patch after kafka-48 is committed.
          Hide
          Jun Rao added a comment -

          Prashanth,

          Now that kafka-48 is committed to 0.8, we can commit your patch. Since you are a committer now, could you commit this yourself?

          Show
          Jun Rao added a comment - Prashanth, Now that kafka-48 is committed to 0.8, we can commit your patch. Since you are a committer now, could you commit this yourself?
          Hide
          Prashanth Menon added a comment -

          Sorry for the delay everyone. I'm planning to block off some time this weekend to commit this patch, and hoping I don't run into any access/permissions issues

          Show
          Prashanth Menon added a comment - Sorry for the delay everyone. I'm planning to block off some time this weekend to commit this patch, and hoping I don't run into any access/permissions issues
          Hide
          Prashanth Menon added a comment -

          Excellent, committed this to 0.8.

          Show
          Prashanth Menon added a comment - Excellent, committed this to 0.8.

            People

            • Assignee:
              Prashanth Menon
              Reporter:
              Jun Rao
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development