Kafka
  1. Kafka
  2. KAFKA-955

After a leader change, messages sent with ack=0 are lost

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:
      None

      Description

      If the leader changes for a partition, and a producer is sending messages with ack=0, then messages will be lost, since the producer has no active way of knowing that the leader has changed, until it's next metadata refresh update.

      The broker receiving the message, which is no longer the leader, logs a message like this:

      Produce request with correlation id 7136261 from client on partition [mytopic,0] failed due to Leader not local for partition [mytopic,0] on broker 508818741

      This is exacerbated by the controlled shutdown mechanism, which forces an immediate leader change.

      A possible solution to this would be for a broker which receives a message, for a topic that it is no longer the leader for (and if the ack level is 0), then the broker could just silently forward the message over to the current leader.

      1. KAFKA-955.v1.patch
        3 kB
        Guozhang Wang
      2. KAFKA-955.v1.patch
        6 kB
        Guozhang Wang
      3. KAFKA-955.v2.patch
        6 kB
        Guozhang Wang
      4. KAFKA-955.v3.patch
        6 kB
        Guozhang Wang
      5. KAFKA-955.v4.patch
        6 kB
        Guozhang Wang
      6. KAFKA-955.v5.patch
        5 kB
        Guozhang Wang
      7. KAFKA-955.v6.patch
        9 kB
        Guozhang Wang
      8. KAFKA-955.v7.patch
        10 kB
        Guozhang Wang
      9. KAFKA-955-followup.v1.patch
        1 kB
        Guozhang Wang

        Activity

        Hide
        Jason Rosenberg added a comment - - edited

        Here's a related scenario, that I see after a rolling restart of my brokers (using ack=0).

        looking back at my logs, I'm wondering if a producer will reuse the same socket to send data to the same broker, for multiple topics (I'm guessing yes). In which case, it looks like I'm seeing this scenario:

        1. producer1 is happily sending messages for topicX and topicY to serverA (serverA is the leader for both topics, only 1 partition for each topic for simplicity).
        2. serverA is restarted, and in the process, serverB becomes the new leader for both topicX and topicY.
        3. producer1 decides to send a new message to topicX to serverA.
        3a. this results in an exception ("Connection reset by peer"). producer1's connection to serverA is invalidated.
        3b. producer1 makes a new metadata request for topicX, and learns that serverB is now the leader for topicX.
        3c. producer1 resends the message to topicX, on serverB.
        4. producer1 decides to send a new message to topicY to serverA.
        4a. producer1 notes that it's socket to serverA is invalid, so it creates a new connection to serverA.
        4b. producer1 successfully sends it's message to serverA (without realizing that serverA is no longer the leader for topicY).
        4c. serverA logs to it's console:
        2013-06-23 08:28:46,770 WARN [kafka-request-handler-2] server.KafkaApis - [KafkaApi-508818741] Produce request with correlation id 7136261 from client on partition [mytopic,0] failed due to Leader not local for partition [mytopic,0] on broker 508818741
        5. producer1 continues to send messages for topicY to serverA, and serverA continues to log the same messages.
        6. 10 minutes later, producer1 decides to update it's metadata for topicY, and learns that serverB is now the leader for topidY.
        7. the warning messages finally stop in the console for serverA.

        I am pretty sure this scenario, or one very close to it, is what I'm seeing in my logs, after doing a rolling restart, with controlled shutdown.

        I think this scenario makes the issue more severe than just a problem with controlled restart and ack=0.

        Show
        Jason Rosenberg added a comment - - edited Here's a related scenario, that I see after a rolling restart of my brokers (using ack=0). looking back at my logs, I'm wondering if a producer will reuse the same socket to send data to the same broker, for multiple topics (I'm guessing yes). In which case, it looks like I'm seeing this scenario: 1. producer1 is happily sending messages for topicX and topicY to serverA (serverA is the leader for both topics, only 1 partition for each topic for simplicity). 2. serverA is restarted, and in the process, serverB becomes the new leader for both topicX and topicY. 3. producer1 decides to send a new message to topicX to serverA. 3a. this results in an exception ("Connection reset by peer"). producer1's connection to serverA is invalidated. 3b. producer1 makes a new metadata request for topicX, and learns that serverB is now the leader for topicX. 3c. producer1 resends the message to topicX, on serverB. 4. producer1 decides to send a new message to topicY to serverA. 4a. producer1 notes that it's socket to serverA is invalid, so it creates a new connection to serverA. 4b. producer1 successfully sends it's message to serverA (without realizing that serverA is no longer the leader for topicY). 4c. serverA logs to it's console: 2013-06-23 08:28:46,770 WARN [kafka-request-handler-2] server.KafkaApis - [KafkaApi-508818741] Produce request with correlation id 7136261 from client on partition [mytopic,0] failed due to Leader not local for partition [mytopic,0] on broker 508818741 5. producer1 continues to send messages for topicY to serverA, and serverA continues to log the same messages. 6. 10 minutes later, producer1 decides to update it's metadata for topicY, and learns that serverB is now the leader for topidY. 7. the warning messages finally stop in the console for serverA. I am pretty sure this scenario, or one very close to it, is what I'm seeing in my logs, after doing a rolling restart, with controlled shutdown. I think this scenario makes the issue more severe than just a problem with controlled restart and ack=0.
        Hide
        Jun Rao added a comment - - edited

        It seems there are various ways that can cause this to happen. (a) In the above scenario, after the leaders fail over, topicX causes new sockets to be established. Then topicY uses the newly established socket without realizing that the leader for topic Y has changed. (b) When we fetch the metadata for a topic, we fetch the metadata for all partitions. Let's say that we never get to send any data to a particular partition. The socket for this partition is not established since SyncProducer make socket connections lazily on first send. Then the leader for the partition changes. Finally, the producer sends a message to that partition. Now a socket is established to the wrong leader without the producer realizing it.

        In general, if we hit any error for produce requests with ack=0, currently the producer won't notice it. For example, if the broker hits a MessageTooLargeException or if the broker hits any other unexpected exceptions. In those cases, forwarding the requests will not help. Also, forwarding requests will complicate the logic in the broker since we have to figure out the broker's host and port, and potentially cache the socket connection to other brokers.

        An alternative solution is to simply close the socket connection when we hit any error for produce requests with ack=0. This way, the producer will realize the error on next send.

        Show
        Jun Rao added a comment - - edited It seems there are various ways that can cause this to happen. (a) In the above scenario, after the leaders fail over, topicX causes new sockets to be established. Then topicY uses the newly established socket without realizing that the leader for topic Y has changed. (b) When we fetch the metadata for a topic, we fetch the metadata for all partitions. Let's say that we never get to send any data to a particular partition. The socket for this partition is not established since SyncProducer make socket connections lazily on first send. Then the leader for the partition changes. Finally, the producer sends a message to that partition. Now a socket is established to the wrong leader without the producer realizing it. In general, if we hit any error for produce requests with ack=0, currently the producer won't notice it. For example, if the broker hits a MessageTooLargeException or if the broker hits any other unexpected exceptions. In those cases, forwarding the requests will not help. Also, forwarding requests will complicate the logic in the broker since we have to figure out the broker's host and port, and potentially cache the socket connection to other brokers. An alternative solution is to simply close the socket connection when we hit any error for produce requests with ack=0. This way, the producer will realize the error on next send.
        Hide
        Guozhang Wang added a comment -

        Following the close-socket approach, I propose the following change:

        1. Add a closeSocket: Boolean field in Response class.

        2. In KafkaApi.handleProducerRequest, if requireAck == 0 check if numPartitionsInError != 0. If yes set closeSocket to true in the returning response.

        3. SocketServer.Processor.processNewResponses, if curr.responseSend == null, check if closeSocket == true. If yes, log the closing socket info and close the key.

        Show
        Guozhang Wang added a comment - Following the close-socket approach, I propose the following change: 1. Add a closeSocket: Boolean field in Response class. 2. In KafkaApi.handleProducerRequest, if requireAck == 0 check if numPartitionsInError != 0. If yes set closeSocket to true in the returning response. 3. SocketServer.Processor.processNewResponses, if curr.responseSend == null, check if closeSocket == true. If yes, log the closing socket info and close the key.
        Hide
        Guozhang Wang added a comment -

        Add one case for ack=0 in testSendWithDeadBroker, passed.

        Show
        Guozhang Wang added a comment - Add one case for ack=0 in testSendWithDeadBroker, passed.
        Hide
        Jun Rao added a comment -

        Thanks for the patch. Some comments:

        1. SocketServer: We should call updateRequestMetrics even when we close the socket. Otherwise, total time will be broken for that request.

        2. ProducerTest: Let's add a new unit test instead of piggybacking on the existing one. What we can do is to create a sync producer and send a produce request with ack=0 that will introduce an error (e.g., a message larger than max size). After that, we can verified that the underlying socket is closed.

        3. KafkaApi: In the debug logging, why not log the whole producer request?

        Show
        Jun Rao added a comment - Thanks for the patch. Some comments: 1. SocketServer: We should call updateRequestMetrics even when we close the socket. Otherwise, total time will be broken for that request. 2. ProducerTest: Let's add a new unit test instead of piggybacking on the existing one. What we can do is to create a sync producer and send a produce request with ack=0 that will introduce an error (e.g., a message larger than max size). After that, we can verified that the underlying socket is closed. 3. KafkaApi: In the debug logging, why not log the whole producer request?
        Hide
        Guozhang Wang added a comment -

        Thanks for the comments Jun.

        1,2,3. Done.

        Show
        Guozhang Wang added a comment - Thanks for the comments Jun. 1,2,3. Done.
        Hide
        Jun Rao added a comment -

        Thanks for patch v2. Some more comments.

        20. testSendWithAckZeroDeadBroker(): I am not sure if the unit test does what you want. First of all, setup() will always start brokers for each test unless you explicitly shut them down. So, in this test, the brokers are not dead. Second, the test doesn't really test that the socket is closed after error. I suggest that we add a new test in SyncProducerTest. We send a request with ack=0 with a large message. After that, we can try to send a new request again and we should hit a socket I/O exception. We may have to wait for some time between the two requests.

        Show
        Jun Rao added a comment - Thanks for patch v2. Some more comments. 20. testSendWithAckZeroDeadBroker(): I am not sure if the unit test does what you want. First of all, setup() will always start brokers for each test unless you explicitly shut them down. So, in this test, the brokers are not dead. Second, the test doesn't really test that the socket is closed after error. I suggest that we add a new test in SyncProducerTest. We send a request with ack=0 with a large message. After that, we can try to send a new request again and we should hit a socket I/O exception. We may have to wait for some time between the two requests.
        Hide
        Guozhang Wang added a comment -

        Sorry for the name misleading, I did not shut down the broker but instead send a large message to it to trigger the MessageSizeTooLargeException. The name of the test should be testSendTooLargeMessageWithAckZero.

        I will use SyncProducer instead of Producer in this test, and send a normal message to the broker after this message, and expecting it to fail due to socket IO exception.

        Show
        Guozhang Wang added a comment - Sorry for the name misleading, I did not shut down the broker but instead send a large message to it to trigger the MessageSizeTooLargeException. The name of the test should be testSendTooLargeMessageWithAckZero. I will use SyncProducer instead of Producer in this test, and send a normal message to the broker after this message, and expecting it to fail due to socket IO exception.
        Hide
        Guozhang Wang added a comment -

        Add the testMessageSizeTooLargeWithAckZero to syncProducerTest, which:

        1. First send a large message that will cause the MessageSizeTooLarge exception, and hence close the socket. But this message will be silently dropped and lost.

        2. Then send another large message, but just to make sure its size exceeds the buffer size so the socket buffer will be flushed immediately; this send should fail since the socket has been closed.

        Show
        Guozhang Wang added a comment - Add the testMessageSizeTooLargeWithAckZero to syncProducerTest, which: 1. First send a large message that will cause the MessageSizeTooLarge exception, and hence close the socket. But this message will be silently dropped and lost. 2. Then send another large message, but just to make sure its size exceeds the buffer size so the socket buffer will be flushed immediately; this send should fail since the socket has been closed.
        Hide
        Magnus Edenhill added a comment -

        If the producer is sending messages through the same broker for other topic+partitions that did not have a leader change they will also be affected by the close of the socket, resulting in lost messages.

        It would be better if the broker would notify all connected clients of broker changes (leader change, broker add/delete, topic add/delete)
        by sending an unsolicited MetadataResponse message (with corrid 0) (or by some other mean).

        This would propogate topology changes in a faster and less intrusive way.

        Show
        Magnus Edenhill added a comment - If the producer is sending messages through the same broker for other topic+partitions that did not have a leader change they will also be affected by the close of the socket, resulting in lost messages. It would be better if the broker would notify all connected clients of broker changes (leader change, broker add/delete, topic add/delete) by sending an unsolicited MetadataResponse message (with corrid 0) (or by some other mean). This would propogate topology changes in a faster and less intrusive way.
        Hide
        Guozhang Wang added a comment -

        Hello Magnus,

        1. Under Ack=0, the producer does not expect any responses for produce request, and it does not listen to any possible connections from the producer either. So actively sending MetadataResponse would not work: producers are only expecting MetadataResponse when they send MetadataRequest.

        2. When we close the socket, producer would be notified and try to refresh their Metadata and retry. Since by default each produce request will be retried multiple times before it is got dropped, the current approach would not cause lost messages.

        Show
        Guozhang Wang added a comment - Hello Magnus, 1. Under Ack=0, the producer does not expect any responses for produce request, and it does not listen to any possible connections from the producer either. So actively sending MetadataResponse would not work: producers are only expecting MetadataResponse when they send MetadataRequest. 2. When we close the socket, producer would be notified and try to refresh their Metadata and retry. Since by default each produce request will be retried multiple times before it is got dropped, the current approach would not cause lost messages.
        Hide
        Magnus Edenhill added a comment -

        Hi Guozhang,

        I understand that you might not want to introduce a new message semantic at this point of the 0.8 beta, but it wont get easier after the release.

        My proposal is a change of the protocol definition to allow unsolicited metadata response messages to be sent from the broker, this would of course require changes in most clients, but a very small one for those that are not interested in keeping their leader cache up to date.

        Consider a producer forwarding >100kmsgs/s for a number of topics to a broker that suddenly drops the connection because one of those topics changed leader, the producer message queue will quickly build up and might start dropping messages (for topics that didnt loose their leader) due to local queue thresholds or very slowly recover if the current rate of messages is close to the maximum thruput.

        In my mind closing the socket because one top+par changed leader is a very intrusive way to signal an event for sub-set of the communication, and it should instead be fixed properly with an unsoliticed metadata response message.

        The unsolicited metadata response message is useful for other scenarios aswell, new brokers and topics being added, for instance.

        My two cents on the topic, thank you.

        Show
        Magnus Edenhill added a comment - Hi Guozhang, I understand that you might not want to introduce a new message semantic at this point of the 0.8 beta, but it wont get easier after the release. My proposal is a change of the protocol definition to allow unsolicited metadata response messages to be sent from the broker, this would of course require changes in most clients, but a very small one for those that are not interested in keeping their leader cache up to date. Consider a producer forwarding >100kmsgs/s for a number of topics to a broker that suddenly drops the connection because one of those topics changed leader, the producer message queue will quickly build up and might start dropping messages (for topics that didnt loose their leader) due to local queue thresholds or very slowly recover if the current rate of messages is close to the maximum thruput. In my mind closing the socket because one top+par changed leader is a very intrusive way to signal an event for sub-set of the communication, and it should instead be fixed properly with an unsoliticed metadata response message. The unsolicited metadata response message is useful for other scenarios aswell, new brokers and topics being added, for instance. My two cents on the topic, thank you.
        Hide
        Jun Rao added a comment -

        Magnus, thanks for your comment. What you suggested is interesting and could be a more effective way of communicating between the producer and the broker. It does require that the producer be able to receive requests initiated at the broker. We do plan to make the producer side processing selector based for efficiency reason. However, this will be a post 0.8 item. We could consider your suggestion then. Regarding your concern about dropped messages, my take is the following. If a client chooses not to receive an ack, it probably means that losing a few batch of messages is not that important. If a client does care about data loss, it can choose ack with 1 or -1. The throughout will be less. However, there are other ways to improve the throughput (e.g., using a larger batch size and/or more instances of producers).

        Guozhang, patch v3 looks good to me overall. A few more comments:

        30. SyncProducerTest.testMessagesizeTooLargeWithAckZero(): You hardcoded the sleep to 500ms. Could you change it to the waitUntil style wait such that the test can finish early if the conditions have been met?

        31. KafkaApi.handleProducerRequest(): The logging should probably be at debug level since this doesn't indicate an error at the broker. It's really an error for the client.

        Show
        Jun Rao added a comment - Magnus, thanks for your comment. What you suggested is interesting and could be a more effective way of communicating between the producer and the broker. It does require that the producer be able to receive requests initiated at the broker. We do plan to make the producer side processing selector based for efficiency reason. However, this will be a post 0.8 item. We could consider your suggestion then. Regarding your concern about dropped messages, my take is the following. If a client chooses not to receive an ack, it probably means that losing a few batch of messages is not that important. If a client does care about data loss, it can choose ack with 1 or -1. The throughout will be less. However, there are other ways to improve the throughput (e.g., using a larger batch size and/or more instances of producers). Guozhang, patch v3 looks good to me overall. A few more comments: 30. SyncProducerTest.testMessagesizeTooLargeWithAckZero(): You hardcoded the sleep to 500ms. Could you change it to the waitUntil style wait such that the test can finish early if the conditions have been met? 31. KafkaApi.handleProducerRequest(): The logging should probably be at debug level since this doesn't indicate an error at the broker. It's really an error for the client.
        Hide
        Guozhang Wang added a comment -

        Thanks for the comments Jun.

        30. Done.
        31. After a second thought I realized that we do not need to sleep since the second message size is large enough to cause the socket buffer to flush immediately, and by then the socket close should have been triggered by the server. This has been verified in the unit test.

        Made some minor changes on comments and rebased on 0.8

        Show
        Guozhang Wang added a comment - Thanks for the comments Jun. 30. Done. 31. After a second thought I realized that we do not need to sleep since the second message size is large enough to cause the socket buffer to flush immediately, and by then the socket close should have been triggered by the server. This has been verified in the unit test. Made some minor changes on comments and rebased on 0.8
        Hide
        Neha Narkhede added a comment -

        Thanks for the patches, Guozhang. I reviewed patch v4 and here are some comments -

        KafkaApis and SocketServer
        1.1 One way to allow the socket server to close the channel is to just mark the request's key cancelled in the Response object. This way when the socket server is handling the response, it will throw a CancelledKeyException and we close the key in this case. One advantage of this approach is we can avoid introducing the close socket flag, just to handle this case. To make sure the request metrics are always updated, we can move curr.request.updateRequestMetrics to the first statement in the (curr.responseSend == null) block.

        1.2 I think the below warn message can be improved -
        Sending the close socket signal due to error handling produce request [%s] with Ack=0

        Let's include the client id, correlation id and list of topics and partitions that this request had. This is probably more useful than printing the entire produce request as is, since that attempts to print things like ByteBufferMessageSet and is unreadable.

        Show
        Neha Narkhede added a comment - Thanks for the patches, Guozhang. I reviewed patch v4 and here are some comments - KafkaApis and SocketServer 1.1 One way to allow the socket server to close the channel is to just mark the request's key cancelled in the Response object. This way when the socket server is handling the response, it will throw a CancelledKeyException and we close the key in this case. One advantage of this approach is we can avoid introducing the close socket flag, just to handle this case. To make sure the request metrics are always updated, we can move curr.request.updateRequestMetrics to the first statement in the (curr.responseSend == null) block. 1.2 I think the below warn message can be improved - Sending the close socket signal due to error handling produce request [%s] with Ack=0 Let's include the client id, correlation id and list of topics and partitions that this request had. This is probably more useful than printing the entire produce request as is, since that attempts to print things like ByteBufferMessageSet and is unreadable.
        Hide
        Guozhang Wang added a comment -

        Thanks for the comments Neha.

        1.1. Great point. The only concern is that now KafkaApis need to know that requestKey is actually java.nio.channels.SelectionKey. But I think this is fine.

        1.2. Done.

        Show
        Guozhang Wang added a comment - Thanks for the comments Neha. 1.1. Great point. The only concern is that now KafkaApis need to know that requestKey is actually java.nio.channels.SelectionKey. But I think this is fine. 1.2. Done.
        Hide
        Guozhang Wang added a comment -

        After talking around with people I now proposed an approach similar to v4 but generalized with a responseCode instead of just a close socket flag. And on SocketServer the processor would act based on the code instead of checking if the responseSend is null or not.

        Also change aliveBrokers in KafkaApis from var to val since it is not overwritten in lifetime.

        Show
        Guozhang Wang added a comment - After talking around with people I now proposed an approach similar to v4 but generalized with a responseCode instead of just a close socket flag. And on SocketServer the processor would act based on the code instead of checking if the responseSend is null or not. Also change aliveBrokers in KafkaApis from var to val since it is not overwritten in lifetime.
        Hide
        Neha Narkhede added a comment -

        I like the way the responseCode is generalized. Patch v6 looks good, few minor comments before checkin -

        1. Remove unused variable allBrokers from KafkaApis
        2. This comment needs to be changed according to the new response code logic -
        // a null response send object indicates

        Maybe we should wait for review from Jay Kreps since he has most context on the socket server.

        Show
        Neha Narkhede added a comment - I like the way the responseCode is generalized. Patch v6 looks good, few minor comments before checkin - 1. Remove unused variable allBrokers from KafkaApis 2. This comment needs to be changed according to the new response code logic - // a null response send object indicates Maybe we should wait for review from Jay Kreps since he has most context on the socket server.
        Hide
        Jay Kreps added a comment -

        Great fix. A few minor comments, mostly stylistic.

        RequestChannel.scala:
        1. This usage exposes a bit much:
        requestChannel.sendResponse(new RequestChannel.Response(request.processor, request, null, RequestChannel.CloseSocket))
        I think it might be nicer to have this instead:
        requestChannel.close(request.processor, request)
        and
        requestChannel.noResponse(req.processor, request)
        Implementation would be the same, it just would just be a little more clear for the user and the response codes can be private.

        Likewise in the response object I should be able to

        2. These are a little confusing:
        val SendResponse: Short = 0
        val NoResponse: Short = 1
        val CloseSocket: Short = 9
        Why is it 0, 1, and 9?

        What is the relationship between these and ErrorMapping? It should be clear from reading.

        Is there a reason we can't use a case class
        case class ResponseAction
        case object SendAction extends ResponseAction
        case object NoOpAction extends ResponseAction
        case object CloseConnectionAction extends ResponseAction

        Then to use it

        response.action match

        { case SendAction => do send case NoOpAction => read more case CloseConnectionAction => something }

        This seems clearer to me and I don't think it is significantly more expensive.

        Can we also standardize the usage so that we no longer have the user EITHER give null or NoResponse? It should be one or the other.

        3. This logging "Cancelling the request key to notify socket server close the connection due to error handling produce request " is not informative to the user. What does it mean to cancel a key? What broke? What should they do? I also think this should be info unless we want the server admin to take some action (I don't think so, right? This is a normal occurance).

        SocketServer.scala
        4. The comment "a null response send object" is retained but we are no longer using null to indicate this we are using RequestChannel.NoResponse. I think this comment is actually a little verbose given that we now have a nicely named response action.

        ProducerTest.scala:
        5. org.scalatest.TestFailedException: Is there a reason you are giving the full path here instead of importing it

        Question on testing, what is the message loss rate with acks=0 under moderate load if we do something like a controlled shutdown with other replicas available?

        Show
        Jay Kreps added a comment - Great fix. A few minor comments, mostly stylistic. RequestChannel.scala: 1. This usage exposes a bit much: requestChannel.sendResponse(new RequestChannel.Response(request.processor, request, null, RequestChannel.CloseSocket)) I think it might be nicer to have this instead: requestChannel.close(request.processor, request) and requestChannel.noResponse(req.processor, request) Implementation would be the same, it just would just be a little more clear for the user and the response codes can be private. Likewise in the response object I should be able to 2. These are a little confusing: val SendResponse: Short = 0 val NoResponse: Short = 1 val CloseSocket: Short = 9 Why is it 0, 1, and 9? What is the relationship between these and ErrorMapping? It should be clear from reading. Is there a reason we can't use a case class case class ResponseAction case object SendAction extends ResponseAction case object NoOpAction extends ResponseAction case object CloseConnectionAction extends ResponseAction Then to use it response.action match { case SendAction => do send case NoOpAction => read more case CloseConnectionAction => something } This seems clearer to me and I don't think it is significantly more expensive. Can we also standardize the usage so that we no longer have the user EITHER give null or NoResponse? It should be one or the other. 3. This logging "Cancelling the request key to notify socket server close the connection due to error handling produce request " is not informative to the user. What does it mean to cancel a key? What broke? What should they do? I also think this should be info unless we want the server admin to take some action (I don't think so, right? This is a normal occurance). SocketServer.scala 4. The comment "a null response send object" is retained but we are no longer using null to indicate this we are using RequestChannel.NoResponse. I think this comment is actually a little verbose given that we now have a nicely named response action. ProducerTest.scala: 5. org.scalatest.TestFailedException: Is there a reason you are giving the full path here instead of importing it Question on testing, what is the message loss rate with acks=0 under moderate load if we do something like a controlled shutdown with other replicas available?
        Hide
        Guozhang Wang added a comment -

        Thanks for the comments, Neha, Jay.

        Neha:

        1. Done.
        2. Incorporated with Jay's comments.

        Jay:

        1. Done.
        2. Done. I ended up using trait and objects, and let requestChannel.close and requestChannel.noOperation (I changed the name from noResponse here since it matches the noOpAction better) create new responses themselves.
        3. Done.
        4. Done.
        5. Done.

        Regarding your question, the message loss is depending on the producer throughput and queue size. Since the first message will always be silently dropped, and once the producer noticed the socket closure, it will stop producing and refresh new metadata, and if during this time the producer queue is full then it will drop more messages. So the answer would be the range between [1, produce-throughput / time-taken-to-refresh-metadata - queue-size].

        Show
        Guozhang Wang added a comment - Thanks for the comments, Neha, Jay. Neha: 1. Done. 2. Incorporated with Jay's comments. Jay: 1. Done. 2. Done. I ended up using trait and objects, and let requestChannel.close and requestChannel.noOperation (I changed the name from noResponse here since it matches the noOpAction better) create new responses themselves. 3. Done. 4. Done. 5. Done. Regarding your question, the message loss is depending on the producer throughput and queue size. Since the first message will always be silently dropped, and once the producer noticed the socket closure, it will stop producing and refresh new metadata, and if during this time the producer queue is full then it will drop more messages. So the answer would be the range between [1, produce-throughput / time-taken-to-refresh-metadata - queue-size] .
        Hide
        Jay Kreps added a comment -

        +1 Gorgeous.

        Show
        Jay Kreps added a comment - +1 Gorgeous.
        Hide
        Neha Narkhede added a comment -

        This is great. +1. One improvement on logging -

        info(("Send the close connection response due to error handling produce request " +
        "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0")
        .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.mkString("[",",","]")))

        Here we only want to print the topic and partition, so it seems that we should be printing the keys of the map, not the entire map ?
        produceRequest.topicPartitionMessageSizeMap.keySet.mkString(",")

        I can make this change on checkin.

        Show
        Neha Narkhede added a comment - This is great. +1. One improvement on logging - info(("Send the close connection response due to error handling produce request " + " [clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0") .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.mkString(" [",",","] "))) Here we only want to print the topic and partition, so it seems that we should be printing the keys of the map, not the entire map ? produceRequest.topicPartitionMessageSizeMap.keySet.mkString(",") I can make this change on checkin.
        Hide
        Neha Narkhede added a comment -

        Committed patch v7 to 0.8 after making the logging fix described above

        Show
        Neha Narkhede added a comment - Committed patch v7 to 0.8 after making the logging fix described above
        Hide
        Jun Rao added a comment -

        Thanks for patch v7. A couple of more comments.

        70. There is a long standing bug in ProducerRequest.handleError(). If ack=0, we shouldn't send a response when the broker hits an unexpected error. We should either close the socket connection or send no response. Not sure which one is better.

        71. A minor issue. The following comment in RequestChannel is a bit confusing. It sounds like that it needs to read more data from network to complete this request, but it is not.
        /** No operation to take for the request, need to read more over the network */
        def noOperation(processor: Int, request: RequestChannel.Request) {

        Show
        Jun Rao added a comment - Thanks for patch v7. A couple of more comments. 70. There is a long standing bug in ProducerRequest.handleError(). If ack=0, we shouldn't send a response when the broker hits an unexpected error. We should either close the socket connection or send no response. Not sure which one is better. 71. A minor issue. The following comment in RequestChannel is a bit confusing. It sounds like that it needs to read more data from network to complete this request, but it is not. /** No operation to take for the request, need to read more over the network */ def noOperation(processor: Int, request: RequestChannel.Request) {
        Hide
        Guozhang Wang added a comment -
        Show
        Guozhang Wang added a comment - Created reviewboard https://reviews.apache.org/r/14140/
        Hide
        Jun Rao added a comment -

        Thanks for the followup patch. +1 and committed to 0.8.

        Show
        Jun Rao added a comment - Thanks for the followup patch. +1 and committed to 0.8.

          People

          • Assignee:
            Guozhang Wang
            Reporter:
            Jason Rosenberg
          • Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development