Kafka
  1. Kafka
  2. KAFKA-706

broker appears to be encoding ProduceResponse, but never sending it

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: core
    • Labels:
      None
    • Environment:
      reproduced on both Mac OS and RH linux, via private node.js client

      Description

      By all appearances, I seem to be able to convince a broker to periodically encode, but never transmit, a ProduceResponse. Unfortunately my client is proprietary, but I will share it with Neha via LI channels. But I will describe what's going on in the hopes that there's another trivial way to reproduce it. (I did search through JIRA, and haven't found anything that looks like this.)

      I am running a single instance zookeeper and single broker. I have a client that generates configurable amounts of data, tracking what is produced (both sent and ACK'd), and what is consumed. I was noticing that when using high transfer rates via high frequency single messages, my unack'd queue appeared to be getting continuously larger. So, I outfitted my client to log more information about correlation ids at various stages, and modified the kafka ProducerRequest/ProducerResponse to log (de)serialization of the same. I then used tcpdump to intercept all communications between my client and the broker. Finally, I configured my client to generate 1 message per ~10ms, each payload being approximately 33 bytes; requestAckTimeout was set to 2000ms, and requestAcksRequired was set to 1. I used 10ms as I found that 5ms or less caused my unacked queue to build up due to system speed – it simply couldn't keep up. 10ms keeps the load high, but just manageable. YMMV with that param. All of this is done on a single host, over loopback. I ran it on both my airbook, and a well setup RH linux box, and found the same problem.

      At startup, my system logged "expired" requests - meaning reqs that were sent, but for which no ACK, positive or negative, was seen from the broker, within 1.25x the requestAckTimeout (ie, 2500ms). I would let it settle until the unacked queue was stable at or around 0.

      What I found is this: ACKs are normally generated within milliseconds. This was demonstrated by my logging added to the scala ProducerRe* classes, and they are normally seen quickly by my client. But when the actual error occurs, namely that a request is ignored, the ProducerResponse class does encode the correct correlationId; however, a response containing that ID is never sent over the network, as evidenced by my tcpdump traces. In my experience this would take anywhere from 3-15 seconds to occur after the system was warm, meaning that it's 1 out of several hundred on average that shows the condition.

      While I can't attach my client code, I could attach logs; but since my intention is to share the code with LI people, I will wait to see if that's useful here.

        Issue Links

          Activity

          Transition Time In Source Status Execution Times Last Executer Last Execution Date
          Open Open Resolved Resolved
          36d 16h 21m 1 Sriram Subramanian 22/Feb/13 07:51
          Resolved Resolved Reopened Reopened
          7h 26m 1 Neha Narkhede 22/Feb/13 15:18
          Reopened Reopened Resolved Resolved
          3d 3h 35m 1 Sriram Subramanian 25/Feb/13 18:53
          Resolved Resolved Closed Closed
          7s 1 Sriram Subramanian 25/Feb/13 18:54
          Tony Stevenson made changes -
          Workflow Apache Kafka Workflow [ 13052067 ] no-reopen-closed, patch-avail [ 13054227 ]
          Tony Stevenson made changes -
          Workflow no-reopen-closed, patch-avail [ 12746259 ] Apache Kafka Workflow [ 13052067 ]
          Gavin made changes -
          Link This issue depends upon KAFKA-736 [ KAFKA-736 ]
          Gavin made changes -
          Link This issue depends on KAFKA-736 [ KAFKA-736 ]
          Sriram Subramanian made changes -
          Status Resolved [ 5 ] Closed [ 6 ]
          Sriram Subramanian made changes -
          Status Reopened [ 4 ] Resolved [ 5 ]
          Resolution Fixed [ 1 ]
          Hide
          Sriram Subramanian added a comment -

          The dependent bug has been fixed

          Show
          Sriram Subramanian added a comment - The dependent bug has been fixed
          Neha Narkhede made changes -
          Link This issue is blocked by KAFKA-736 [ KAFKA-736 ]
          Neha Narkhede made changes -
          Resolution Fixed [ 1 ]
          Status Resolved [ 5 ] Reopened [ 4 ]
          Hide
          Neha Narkhede added a comment -

          This bug will be fixed by the v2 patch for KAFKA-736

          Show
          Neha Narkhede added a comment - This bug will be fixed by the v2 patch for KAFKA-736
          Sriram Subramanian made changes -
          Status Open [ 1 ] Resolved [ 5 ]
          Resolution Fixed [ 1 ]
          Sriram Subramanian made changes -
          Link This issue depends on KAFKA-736 [ KAFKA-736 ]
          Hide
          Sriram Subramanian added a comment -

          It has been verified that v2 patch fixes this issue.Thank you Ben for your help with this bug.

          Show
          Sriram Subramanian added a comment - It has been verified that v2 patch fixes this issue.Thank you Ben for your help with this bug.
          Hide
          Neha Narkhede added a comment -

          Great catch, Sriram ! I think the v2 patch on KAFKA-736 might solve this problem.

          Show
          Neha Narkhede added a comment - Great catch, Sriram ! I think the v2 patch on KAFKA-736 might solve this problem.
          Hide
          Sriram Subramanian added a comment -

          I think I know what is happening here.

          Our current server is not suitable for async io yet from the client. As part of the processor thread, we continuously invoke processNewResponse on each iteration. processNewResponse does the following

          1. dequeue the response from the response queue
          2. set the interest bit of the selector key to write
          3. attach the response to the key

          The problem is that we dont check if the previous response attached to the key has already been sent or not. We just replace the response and hence drop arbitrary responses. This should not happen with the v2 patch for KAFKA-736 since we would serialize the requests from a client.

          Show
          Sriram Subramanian added a comment - I think I know what is happening here. Our current server is not suitable for async io yet from the client. As part of the processor thread, we continuously invoke processNewResponse on each iteration. processNewResponse does the following 1. dequeue the response from the response queue 2. set the interest bit of the selector key to write 3. attach the response to the key The problem is that we dont check if the previous response attached to the key has already been sent or not. We just replace the response and hence drop arbitrary responses. This should not happen with the v2 patch for KAFKA-736 since we would serialize the requests from a client.
          Sriram Subramanian made changes -
          Field Original Value New Value
          Assignee Sriram Subramanian [ sriramsub ]
          ben fleis created issue -

            People

            • Assignee:
              Sriram Subramanian
              Reporter:
              ben fleis
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development