Kafka
  1. Kafka
  2. KAFKA-320

testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Critical Critical
    • Resolution: Fixed
    • Affects Version/s: 0.7, 0.8.0
    • Fix Version/s: None
    • Component/s: core
    • Labels:
      None

      Description

      The testZKSendWithDeadBroker inside ProducerTest fails intermittently with the following exception -

      [error] Test Failed: testZKSendWithDeadBroker(kafka.producer.ProducerTest)
      java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering.
      at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:109)
      at kafka.server.KafkaZooKeeper.kafka$server$KafkaZooKeeper$$registerBrokerInZk(KafkaZooKeeper.scala:60)
      at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:52)
      at kafka.server.KafkaServer.startup(KafkaServer.scala:84)
      at kafka.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:174)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
      at java.lang.reflect.Method.invoke(Method.java:597)
      at junit.framework.TestCase.runTest(TestCase.java:164)
      at junit.framework.TestCase.runBare(TestCase.java:130)
      at junit.framework.TestResult$1.protect(TestResult.java:110)
      at junit.framework.TestResult.runProtected(TestResult.java:128)
      at junit.framework.TestResult.run(TestResult.java:113)
      at junit.framework.TestCase.run(TestCase.java:120)
      at junit.framework.TestSuite.runTest(TestSuite.java:228)
      at junit.framework.TestSuite.run(TestSuite.java:223)
      at junit.framework.TestSuite.runTest(TestSuite.java:228)
      at junit.framework.TestSuite.run(TestSuite.java:223)
      at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
      at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
      at sbt.TestRunner.run(TestFramework.scala:53)
      at sbt.TestRunner.runTest$1(TestFramework.scala:67)
      at sbt.TestRunner.run(TestFramework.scala:76)
      at sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
      at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
      at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
      at sbt.NamedTestTask.run(TestFramework.scala:92)
      at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
      at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
      at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
      at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
      at sbt.impl.RunTask.runTask(RunTask.scala:85)
      at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
      at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
      at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
      at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
      at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
      at sbt.Control$.trapUnit(Control.scala:19)
      at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)

      The test basically restarts a server and fails with this exception during the restart

      This is unexpected, since server1, after shutting down, should trigger the deletion of its registration of the broker id from ZK. But, here is the Kafka bug causing this problem -

      In the test during server1.shutdown(), we do close the zkClient associated with the broker and it successfully deletes the broker's registration info from Zookeeper. After this, server1 can be succesfully started. Then the test completes and in the teardown(), we call server1.shutdown(). During this, the server doesn't really shutdown, since it is protected with the isShuttingDown variable, which was never set to false in the startup() API. Now, this leads to an open zkclient connection for the current test run.

      If you try to re-run ProducerTest without exiting sbt, it will first bring up the zookeeper server. Then, since the kafka server during the previous run is still running, it can succesfully renew its session with zookeeper, and retain the /brokers/ids/0 ephemeral node. If it does this before server1.startup() is called in the test, the test will fail.

      The fix is to set the shutdown related variables correctly in the startup API of KafkaServer. Also, during debugging this, I found that we don't close zkclient in the Producer as well. Due to this, unit tests throw a whole bunch of WARN that look like -

      [2012-03-26 14:14:27,703] INFO Opening socket connection to server nnarkhed-ld /127.0.0.1:2182 (org.apache.zookeeper.ClientCnxn:1061)
      [2012-03-26 14:14:27,703] WARN Session 0x13650dbf8dd0005 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn:1188)
      java.net.ConnectException: Connection refused
      at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
      at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
      at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146)

      1. kafka-320.patch
        3 kB
        Neha Narkhede
      2. kafka-320-v2.patch
        31 kB
        Neha Narkhede
      3. kafka-320-v3.patch
        25 kB
        Neha Narkhede
      4. kafka-320-v3-delta.patch
        2 kB
        Jun Rao

        Activity

        Hide
        Neha Narkhede added a comment -

        Checked into trunk and 0.8 branch

        Show
        Neha Narkhede added a comment - Checked into trunk and 0.8 branch
        Hide
        Jun Rao added a comment -

        Ok, we can take v3 for now and track the broker startup/shutdown in kafka-328.

        Show
        Jun Rao added a comment - Ok, we can take v3 for now and track the broker startup/shutdown in kafka-328.
        Hide
        Neha Narkhede added a comment -

        After applying the v3-delta patch, I see that some zkclient connections are not closed cleanly -
        [2012-04-06 17:34:12,805] WARN Exception causing close of session 0x0 due to java.io.IOException: ZooKeeperServer not running (org.apache.zookeeper
        .server.NIOServerCnxn:639)
        [2012-04-06 17:34:12,809] WARN Exception causing close of session 0x0 due to java.io.IOException: ZooKeeperServer not running (org.apache.zookeeper
        .server.NIOServerCnxn:639)
        [2012-04-06 17:34:13,201] WARN EndOfStreamException: Unable to read additional data from client sessionid 0x1368a38c37a0005, likely client has clos
        ed socket (org.apache.zookeeper.server.NIOServerCnxn:634)
        [2012-04-06 17:34:13,264] WARN EndOfStreamException: Unable to read additional data from client sessionid 0x1368a38a86a0080, likely client has clos
        ed socket (org.apache.zookeeper.server.NIOServerCnxn:634)

        Also, after you set the isShutdown variable and before you check canStart, there can be interleaving between startup and shutdown, that can lead to open zookeeper client connection.

        Show
        Neha Narkhede added a comment - After applying the v3-delta patch, I see that some zkclient connections are not closed cleanly - [2012-04-06 17:34:12,805] WARN Exception causing close of session 0x0 due to java.io.IOException: ZooKeeperServer not running (org.apache.zookeeper .server.NIOServerCnxn:639) [2012-04-06 17:34:12,809] WARN Exception causing close of session 0x0 due to java.io.IOException: ZooKeeperServer not running (org.apache.zookeeper .server.NIOServerCnxn:639) [2012-04-06 17:34:13,201] WARN EndOfStreamException: Unable to read additional data from client sessionid 0x1368a38c37a0005, likely client has clos ed socket (org.apache.zookeeper.server.NIOServerCnxn:634) [2012-04-06 17:34:13,264] WARN EndOfStreamException: Unable to read additional data from client sessionid 0x1368a38a86a0080, likely client has clos ed socket (org.apache.zookeeper.server.NIOServerCnxn:634) Also, after you set the isShutdown variable and before you check canStart, there can be interleaving between startup and shutdown, that can lead to open zookeeper client connection.
        Hide
        Jun Rao added a comment -

        Overall, patch v3 looks good. I made a minor tweak of KafkaServer on top of v3. How does that look? You will need to:
        1. apply patch v3
        2. svn revert core/src/main/scala/kafka/server/KafkaServer.scala
        3. apply patch kafka-320-v3-delta.patch

        Show
        Jun Rao added a comment - Overall, patch v3 looks good. I made a minor tweak of KafkaServer on top of v3. How does that look? You will need to: 1. apply patch v3 2. svn revert core/src/main/scala/kafka/server/KafkaServer.scala 3. apply patch kafka-320-v3-delta.patch
        Hide
        Neha Narkhede added a comment -

        Filed KAFKA-328 for improving startup and shutdown API of Kafka server.

        Kept everything in v2 minus the complexity of changes in Kafka server.

        Show
        Neha Narkhede added a comment - Filed KAFKA-328 for improving startup and shutdown API of Kafka server. Kept everything in v2 minus the complexity of changes in Kafka server.
        Hide
        Jun Rao added a comment -

        Yes, we can use another jira to see how we can improve kafka server startup and shutdown. For this jira, we can just make minimal changes in kafka server.

        Show
        Jun Rao added a comment - Yes, we can use another jira to see how we can improve kafka server startup and shutdown. For this jira, we can just make minimal changes in kafka server.
        Hide
        Neha Narkhede added a comment -

        Thanks for the review!

        4. Regarding this, what do people think about the conditions under which a Kafka server should be allowed to startup and shutdown (listed under 2.1 and 2.2 above) ?
        5. Will fix this before checkin.
        6. Also, looks like improving the kafka server startup and shutdown is orthogonal to this bug fix. Can this be fixed (cleanly) through another JIRA ? I'd like to include just the fix for this issue as part of the checkin.

        Show
        Neha Narkhede added a comment - Thanks for the review! 4. Regarding this, what do people think about the conditions under which a Kafka server should be allowed to startup and shutdown (listed under 2.1 and 2.2 above) ? 5. Will fix this before checkin. 6. Also, looks like improving the kafka server startup and shutdown is orthogonal to this bug fix. Can this be fixed (cleanly) through another JIRA ? I'd like to include just the fix for this issue as part of the checkin.
        Hide
        Jun Rao added a comment -

        The ZookeeperTestHarness change looks nice. a couple more comments:

        4. KafkaServer: It does look a bit more complex now and some of the testing is not done atomically. How about the following?
        4.1 add an AtomticBoolean isServerStartable and initialize to true;
        4.2 in startup(), if we can atomically set isServerStartable from true to false, proceed with startup; otherwise throw an exception.
        4.3 in shutdown(), if isServerStartable is false, proceed with shutdown, at the very end, set isServerStartable to true.
        Startup() and shutdown() are expected to be called from the same thread. So we can expect a shutdown won't be called until a startup completes.

        5. SyncProducerTest: unused imports

        Show
        Jun Rao added a comment - The ZookeeperTestHarness change looks nice. a couple more comments: 4. KafkaServer: It does look a bit more complex now and some of the testing is not done atomically. How about the following? 4.1 add an AtomticBoolean isServerStartable and initialize to true; 4.2 in startup(), if we can atomically set isServerStartable from true to false, proceed with startup; otherwise throw an exception. 4.3 in shutdown(), if isServerStartable is false, proceed with shutdown, at the very end, set isServerStartable to true. Startup() and shutdown() are expected to be called from the same thread. So we can expect a shutdown won't be called until a startup completes. 5. SyncProducerTest: unused imports
        Hide
        Neha Narkhede added a comment -

        OK, I made some more changes -

        1. Cleaned up zkClient instance creations in unit tests. Now it is wrapped up inside ZookeeperTestHarness, so we ensure that it gets cleanup at an appropriate time.

        2. Changed Kafka server startup and shutdown behavior. Possibly made it more complex. Basically,

        2.1 A Kafka server can startup if it is not already starting up, if it is not currently being shutdown, or if it hasn't been already started

        2.2 A Kafka server can shutdown if it is not already shutting down, if it is not currently starting up, or if it hasn't been already shutdown.

        Show
        Neha Narkhede added a comment - OK, I made some more changes - 1. Cleaned up zkClient instance creations in unit tests. Now it is wrapped up inside ZookeeperTestHarness, so we ensure that it gets cleanup at an appropriate time. 2. Changed Kafka server startup and shutdown behavior. Possibly made it more complex. Basically, 2.1 A Kafka server can startup if it is not already starting up, if it is not currently being shutdown, or if it hasn't been already started 2.2 A Kafka server can shutdown if it is not already shutting down, if it is not currently starting up, or if it hasn't been already shutdown.
        Hide
        Prashanth Menon added a comment -

        Great catch! +1, looks good and works on my machine.

        Show
        Prashanth Menon added a comment - Great catch! +1, looks good and works on my machine.
        Hide
        Jun Rao added a comment - - edited

        That's a good finding. We should probably patch it in both trunk and 0.8. Just one comment.

        We should only allow a KafkaServer to startup if it has been shutdown.

        Show
        Jun Rao added a comment - - edited That's a good finding. We should probably patch it in both trunk and 0.8. Just one comment. We should only allow a KafkaServer to startup if it has been shutdown.
        Hide
        Neha Narkhede added a comment -

        Can someone review this ?

        Show
        Neha Narkhede added a comment - Can someone review this ?
        Hide
        Neha Narkhede added a comment -

        This patch includes the following -

        1. Fixes the kafka server restart bug by resetting the shutdown state variables in the startup() API of the KafkaServer.

        2. Shuts down the zkclient in the Producer

        3. ZkClient and Zookeeper can be set to WARN since we fixed the real issue, causing several warnings during the unit tests.

        Show
        Neha Narkhede added a comment - This patch includes the following - 1. Fixes the kafka server restart bug by resetting the shutdown state variables in the startup() API of the KafkaServer. 2. Shuts down the zkclient in the Producer 3. ZkClient and Zookeeper can be set to WARN since we fixed the real issue, causing several warnings during the unit tests.

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            Neha Narkhede
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development