Kafka
  1. Kafka
  2. KAFKA-1475

Kafka consumer stops LeaderFinder/FetcherThreads, but application does not know

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: clients, consumer
    • Labels:
    • Environment:
      linux, rhel 6.4

      Description

      We encounter an issue of consumers not consuming messages in production. ( this consumer has its own consumer group, and just consumes one topic of 3 partitions.)

      Based on the logs, we have following findings:
      1. Zookeeper session expires, kafka highlevel consumer detected this event, and released old broker parition ownership and re-register consumer.
      2. Upon creating ephemeral path in Zookeeper, it found that the path still exists, and try to read the content of the node.
      3. After read back the content, it founds the content is same as that it is going to write, so it logged as "[ZkClient-EventThread-428-ZK/kafka] (kafka.utils.ZkUtils$) - /consumers/consumerA/ids/consumerA-1400815740329-5055aeee exists with value { "pattern":"static", "subscription":

      { "TOPIC": 1}

      , "timestamp":"1400846114845", "version":1 } during connection loss; this is ok", and doing nothing.
      4. After that, it throws exception indicated that the cause is "org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/consumerA/ids/consumerA-1400815740329-5055aeee" during rebalance.
      5. After all retries failed, it gave up retry and left the LeaderFinderThread, FetcherThread stopped.

      Step 3 looks very weird, checking the code, there is timestamp contains in the stored data, it may be caused by Zookeeper issue.

      But what I am wondering is that whether it is possible to let application (kafka client users) to know that the underline LeaderFinderThread and FetcherThread are stopped, like allowing application to register some callback or throws some exception (by invalidate the KafkaStream iterator for example)? For me, it is not reasonable for the kafka client to shutdown everything and wait for next rebalance, and let application wait on iterator.hasNext() without knowing that there is something wrong underline.

      I've read about twiki about kafka 0.9 consumer rewrite, and there is a ConsumerRebalanceCallback interface, but I am not sure how long it will take to be ready, and how long it will take for us to migrate.

      Please help to look at this issue. Thanks very much!

        Activity

        Hide
        Hang Qi added a comment -

        Not often. Mainly happens on some long full GC on the kafka consumer side, or some heavy load on ZK server side.

        Thanks for the reference, I will take a look at it.

        Show
        Hang Qi added a comment - Not often. Mainly happens on some long full GC on the kafka consumer side, or some heavy load on ZK server side. Thanks for the reference, I will take a look at it.
        Hide
        Guozhang Wang added a comment -

        I see. This seems to me a ZkClient issue, such that it did not give all the events in the event queue and back to the caller. We have some similar findings before (https://issues.apache.org/jira/browse/KAFKA-1387). Do you see this issue often?

        Show
        Guozhang Wang added a comment - I see. This seems to me a ZkClient issue, such that it did not give all the events in the event queue and back to the caller. We have some similar findings before ( https://issues.apache.org/jira/browse/KAFKA-1387 ). Do you see this issue often?
        Hide
        Hang Qi added a comment -

        Not exactly. Let me recap.

        In original description, step 3.
        3. After read back the content, it founds the content is same as that it is going to write, so it logged as "[ZkClient-EventThread-428-ZK/kafka] (kafka.utils.ZkUtils$) - /consumers/consumerA/ids/consumerA-1400815740329-5055aeee exists with value {"pattern":"static", "subscription":

        {"TOPIC": 1}

        , "timestamp":"1400846114845", "version":1}

        the timestamp is 1400846114845 = 11:55:14,845 UTC 2014, which is 07:55:14,845 EDT (our log uses EDT).

        Looking at my second comment and also the logs in the attachment,
        1. 07:55:14 zk client got session expire,populate the event to watcher. Thus kafka client tried to recreate ephemeral node '/kafka8Agg/consumers/myconsumergroup/ids/myconsumergroup_ooo0001-1400815740329-5055aeee

        that's the time when kafka client aware that zk session expired and tried to re-create ephemeral node.

        However, based on following log

        3. 07:55:26 zk sender did not hear server for connectReq response, thus close the connection and try to connect.
        4. 07:55:40 zk sender established connection with mbus0005 and got session 0x545f6dc6f510757
        5. 07:55:40 zk sender got response of create ephemeral node, zk server responded node exist(response code is -110).

        zk client successful connected to zk cluster at 07:55:40, and then found the ephemeral was created and the content was the same with it was about to write.

        But the owner this ephemeral node is 163808312244176699 = 0x245f6cac692073b, but the session before expire (0x345f6cac6ed071d), nor the afterward session (0x545f6dc6f510757).

        So I feel it is very weird, guessing that, between 07:55:14 and 07:55:26, somehow, zk client sent create request to mbus0002,and mbus002 processed it successful , but the response is not read by client, the session 0x245f6cac692073b was created at that time.

        Show
        Hang Qi added a comment - Not exactly. Let me recap. In original description, step 3. 3. After read back the content, it founds the content is same as that it is going to write, so it logged as " [ZkClient-EventThread-428-ZK/kafka] (kafka.utils.ZkUtils$) - /consumers/consumerA/ids/consumerA-1400815740329-5055aeee exists with value {"pattern":"static", "subscription": {"TOPIC": 1} , "timestamp":"1400846114845", "version":1} the timestamp is 1400846114845 = 11:55:14,845 UTC 2014, which is 07:55:14,845 EDT (our log uses EDT). Looking at my second comment and also the logs in the attachment, 1. 07:55:14 zk client got session expire,populate the event to watcher. Thus kafka client tried to recreate ephemeral node '/kafka8Agg/consumers/myconsumergroup/ids/myconsumergroup_ooo0001-1400815740329-5055aeee that's the time when kafka client aware that zk session expired and tried to re-create ephemeral node. However, based on following log 3. 07:55:26 zk sender did not hear server for connectReq response, thus close the connection and try to connect. 4. 07:55:40 zk sender established connection with mbus0005 and got session 0x545f6dc6f510757 5. 07:55:40 zk sender got response of create ephemeral node, zk server responded node exist(response code is -110). zk client successful connected to zk cluster at 07:55:40, and then found the ephemeral was created and the content was the same with it was about to write. But the owner this ephemeral node is 163808312244176699 = 0x245f6cac692073b, but the session before expire (0x345f6cac6ed071d), nor the afterward session (0x545f6dc6f510757). So I feel it is very weird, guessing that, between 07:55:14 and 07:55:26, somehow, zk client sent create request to mbus0002,and mbus002 processed it successful , but the response is not read by client, the session 0x245f6cac692073b was created at that time.
        Hide
        Guozhang Wang added a comment -

        In this case are the consumer using the same timestamp after session timeout to re-register?

        Show
        Guozhang Wang added a comment - In this case are the consumer using the same timestamp after session timeout to re-register?
        Hide
        Hang Qi added a comment -

        Hi Guozhang,

        Thanks for your reply.

        I checked the code between kafka 0.8.0 and 0.8.1, it looks like this piece of codes are the same.

        The function createEphemeralPathExpectConflictHandleZKBug first calls createEphemeralPathExpectConflict and using the checker to handle ZkNodeExistsException. However, createEphemeralPathExpectConflict itself has some logic to handle ZkNodeExistsException.

        /**

        • Create an ephemeral node with the given path and data.
        • Throw NodeExistException if node already exists.
          */
          def createEphemeralPathExpectConflict(client: ZkClient, path: String, data: String): Unit = {
          try { createEphemeralPath(client, path, data) }

          catch {
          case e: ZkNodeExistsException =>

          Unknown macro: { // this can happen when there is connection loss; make sure the data is what we intend to write var storedData}


          case e2: Throwable => throw e2
          }
          }

        We observed the log " info(path + " exists with value " + data + " during connection loss; this is ok")" which comes from createEphemeralPathExpectConflict.

        Show
        Hang Qi added a comment - Hi Guozhang, Thanks for your reply. I checked the code between kafka 0.8.0 and 0.8.1, it looks like this piece of codes are the same. The function createEphemeralPathExpectConflictHandleZKBug first calls createEphemeralPathExpectConflict and using the checker to handle ZkNodeExistsException. However, createEphemeralPathExpectConflict itself has some logic to handle ZkNodeExistsException. /** Create an ephemeral node with the given path and data. Throw NodeExistException if node already exists. */ def createEphemeralPathExpectConflict(client: ZkClient, path: String, data: String): Unit = { try { createEphemeralPath(client, path, data) } catch { case e: ZkNodeExistsException => Unknown macro: { // this can happen when there is connection loss; make sure the data is what we intend to write var storedData} case e2: Throwable => throw e2 } } We observed the log " info(path + " exists with value " + data + " during connection loss; this is ok")" which comes from createEphemeralPathExpectConflict.
        Hide
        Guozhang Wang added a comment -

        Hello Hang,

        Yes we are aware of of this issue, but in zkClient 0.3 there is no API to get the session ID, so we tried to use the timestamp in the znode as a walk-around solution in 0.8.1, such that if the previous znode is written by an older session its timestamp should be different. You can try to upgrade to 0.8.1.1 to see if this issue has gone away for you.


        def createEphemeralPathExpectConflictHandleZKBug(zkClient: ZkClient, path: String, data: String, expectedCallerData: Any, checker: (String, Any) => Boolean, backoffTime: Int): Unit = {
        while (true) {
        try

        { createEphemeralPathExpectConflict(zkClient, path, data) return }

        catch {
        case e: ZkNodeExistsException => {
        // An ephemeral node may still exist even after its corresponding session has expired
        // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted
        // and hence the write succeeds without ZkNodeExistsException
        ZkUtils.readDataMaybeNull(zkClient, path)._1 match {
        case Some(writtenData) => {
        if (checker(writtenData, expectedCallerData))

        { info("I wrote this conflicted ephemeral node [%s] at %s a while back in a different session, ".format(data, path) + "hence I will backoff for this node to be deleted by Zookeeper and retry") Thread.sleep(backoffTime) }

        else

        { throw e }

        }
        case None => // the node disappeared; retry creating the ephemeral node immediately
        }
        }
        case e2: Throwable => throw e2
        }
        }
        }

        Show
        Guozhang Wang added a comment - Hello Hang, Yes we are aware of of this issue, but in zkClient 0.3 there is no API to get the session ID, so we tried to use the timestamp in the znode as a walk-around solution in 0.8.1, such that if the previous znode is written by an older session its timestamp should be different. You can try to upgrade to 0.8.1.1 to see if this issue has gone away for you. — def createEphemeralPathExpectConflictHandleZKBug(zkClient: ZkClient, path: String, data: String, expectedCallerData: Any, checker: (String, Any) => Boolean, backoffTime: Int): Unit = { while (true) { try { createEphemeralPathExpectConflict(zkClient, path, data) return } catch { case e: ZkNodeExistsException => { // An ephemeral node may still exist even after its corresponding session has expired // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted // and hence the write succeeds without ZkNodeExistsException ZkUtils.readDataMaybeNull(zkClient, path)._1 match { case Some(writtenData) => { if (checker(writtenData, expectedCallerData)) { info("I wrote this conflicted ephemeral node [%s] at %s a while back in a different session, ".format(data, path) + "hence I will backoff for this node to be deleted by Zookeeper and retry") Thread.sleep(backoffTime) } else { throw e } } case None => // the node disappeared; retry creating the ephemeral node immediately } } case e2: Throwable => throw e2 } } } —
        Hide
        Hang Qi added a comment -

        Hi Guozhang,

        Thanks for your reply, in this case, we have to add some health check in application level like count how many messages received within a certain period.

        I am still very curious about step3, and want to dig out the root cause. I've collected the logs of zkclient in consumer side, however, the logs in the zk server side are purged. But from the logs on client side, we can still see something interesting.

        Following is the sequence flow:
        1. 07:55:14 zk client got session expire,populate the event to watcher. Thus kafka client tried to recreate ephemeral node '/kafka8Agg/consumers/myconsumergroup/ids/myconsumergroup_ooo0001-1400815740329-5055aeee
        2. 07:55:14 zk sender connected to mbus0002, and send connectReq
        3. 07:55:26 zk sender did not hear server for connectReq response, thus close the connection and try to connect.
        4. 07:55:40 zk sender established connection with mbus0005 and got session 0x545f6dc6f510757
        5. 07:55:40 zk sender got response of create ephemeral node, zk server responded node exist(response code is -110).
        6. 07:55:40 kafka client wanted to read /kafka8Agg/consumers/myconsumergroup/ids/myconsumergroup_ooo0001-1400815740329-5055aeee, zk sender sent request and also read the response, kafka client noticed that the data content of node is the same as what it wants to write (#7b20227061747465726e223a22737461746963222c2022737562736372697074696f6e223a7b202253746f70416c6c5f55532d45415354223a2031207d2c202274696d657374616d70223a2231343030383436313134383435222c202276657273696f6e223a31207d). To note that, the owner of the ephemeral node is 163808312244176699 = 0x245f6cac692073b in stat (s

        {150396710990,150396710990,1400846114866,1400846114866,0,0,0,163808312244176699,105,0,150396710990}

        ),which is not the current session,nor the sessionId before the session expire(0x345f6cac6ed071d)
        7. 07:56:07 kafka client wanted read the ephemeral node failure,zk response node not exist.

        One more thing is that another zk client also read the same content and stat for path /kafka8Agg/consumers/myconsumergroup/ids/myconsumergroup_ooo0001-1400815740329-5055aeee,the zk server it connected to is mbus0002.

        The weird thing is that there is no log for sessionId 0x245f6cac692073b in the log. So my gut feeling is that, between 07:55:14 and 07:55:26, somehow, zk client sent create request to mbus0002,and mbus002 processed it successful , but the response is not read by client, the session 0x245f6cac692073b was created at that time.

        However, checking the ZK client code, this should not happen, zk client supposed to send requests to server only when it is in syncConnected state.

        Do you have any idea or comments about this?

        According to this issue, I feel that it is more safe to check sessionId of ephemeral node rather than timestamp when recovering from ZK session expire.

        After all, as you said, kafka 0.8 consumer will be obsoleted anyway. But I want to call this out as the same code utils/ZkUtils.scala would be shared with broker.

        Thanks
        Hang Qi

        Show
        Hang Qi added a comment - Hi Guozhang, Thanks for your reply, in this case, we have to add some health check in application level like count how many messages received within a certain period. I am still very curious about step3, and want to dig out the root cause. I've collected the logs of zkclient in consumer side, however, the logs in the zk server side are purged. But from the logs on client side, we can still see something interesting. Following is the sequence flow: 1. 07:55:14 zk client got session expire,populate the event to watcher. Thus kafka client tried to recreate ephemeral node '/kafka8Agg/consumers/myconsumergroup/ids/myconsumergroup_ooo0001-1400815740329-5055aeee 2. 07:55:14 zk sender connected to mbus0002, and send connectReq 3. 07:55:26 zk sender did not hear server for connectReq response, thus close the connection and try to connect. 4. 07:55:40 zk sender established connection with mbus0005 and got session 0x545f6dc6f510757 5. 07:55:40 zk sender got response of create ephemeral node, zk server responded node exist(response code is -110). 6. 07:55:40 kafka client wanted to read /kafka8Agg/consumers/myconsumergroup/ids/myconsumergroup_ooo0001-1400815740329-5055aeee, zk sender sent request and also read the response, kafka client noticed that the data content of node is the same as what it wants to write (#7b20227061747465726e223a22737461746963222c2022737562736372697074696f6e223a7b202253746f70416c6c5f55532d45415354223a2031207d2c202274696d657374616d70223a2231343030383436313134383435222c202276657273696f6e223a31207d). To note that, the owner of the ephemeral node is 163808312244176699 = 0x245f6cac692073b in stat (s {150396710990,150396710990,1400846114866,1400846114866,0,0,0,163808312244176699,105,0,150396710990} ),which is not the current session,nor the sessionId before the session expire(0x345f6cac6ed071d) 7. 07:56:07 kafka client wanted read the ephemeral node failure,zk response node not exist. One more thing is that another zk client also read the same content and stat for path /kafka8Agg/consumers/myconsumergroup/ids/myconsumergroup_ooo0001-1400815740329-5055aeee,the zk server it connected to is mbus0002. The weird thing is that there is no log for sessionId 0x245f6cac692073b in the log. So my gut feeling is that, between 07:55:14 and 07:55:26, somehow, zk client sent create request to mbus0002,and mbus002 processed it successful , but the response is not read by client, the session 0x245f6cac692073b was created at that time. However, checking the ZK client code, this should not happen, zk client supposed to send requests to server only when it is in syncConnected state. Do you have any idea or comments about this? According to this issue, I feel that it is more safe to check sessionId of ephemeral node rather than timestamp when recovering from ZK session expire. After all, as you said, kafka 0.8 consumer will be obsoleted anyway. But I want to call this out as the same code utils/ZkUtils.scala would be shared with broker. Thanks Hang Qi
        Hide
        Guozhang Wang added a comment -

        This is a valid point, and it is a little tricky to expose the exceptions from background threads. That is also why in the new consumer design we are going to be single threaded.

        Since the new consumer is coming in 3 to 4 month, I am wondering if this issue does not occur often we can focus our effort on making the new consumer into production instead of patching the old one that is going to be obsoleted anyways?

        Show
        Guozhang Wang added a comment - This is a valid point, and it is a little tricky to expose the exceptions from background threads. That is also why in the new consumer design we are going to be single threaded. Since the new consumer is coming in 3 to 4 month, I am wondering if this issue does not occur often we can focus our effort on making the new consumer into production instead of patching the old one that is going to be obsoleted anyways?

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            Hang Qi
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:

              Development