Details

    • Type: Bug Bug
    • Status: Reopened
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.8.0, 0.8.1
    • Fix Version/s: 0.9.0
    • Component/s: core
    • Labels:

      Description

      We have excessive logging in the server, making the logs unreadable and also affecting the performance of the server in practice. We need to clean the logs to address these issues.

      1. KAFKA-1352.patch
        3 kB
        Ivan Lyutov
      2. KAFKA-1352_2014-04-04_21:20:31.patch
        3 kB
        Ivan Lyutov
      3. KAFKA-1352.patch
        17 kB
        Guozhang Wang

        Issue Links

          Activity

          Hide
          Jun Rao added a comment -

          Some examples of excessive logging.

          2014/03/26 00:56:23.605 ERROR [KafkaApis] [kafka-request-handler-12] [kafka-server] [] [KafkaApi-512] Error while fetching metadata for partition [xxx,4]
          kafka.common.ReplicaNotAvailableException
          at kafka.server.KafkaApis$$anonfun$17$$anonfun$20.apply(KafkaApis.scala:552)
          at kafka.server.KafkaApis$$anonfun$17$$anonfun$20.apply(KafkaApis.scala:537)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
          at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
          at scala.collection.immutable.List.foreach(List.scala:45)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
          at scala.collection.immutable.List.map(List.scala:45)
          at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:537)
          at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:533)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
          at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:123)
          at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)
          at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)
          at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
          at scala.collection.immutable.HashSet.map(HashSet.scala:32)
          at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:533)

          2014/03/26 07:42:01.845 ERROR [ReplicaFetcherThread] [ReplicaFetcherThread-4-516] [kafka-server] [] [ReplicaFetcherThread-4-516], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 84
          2728; ClientId: ReplicaFetcherThread-4-516; ReplicaId: 512; MaxWait: 0 ms; MinBytes: 1 bytes; RequestInfo: [ xxx ]
          java.net.SocketTimeoutException
          at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
          at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
          at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
          at kafka.utils.Utils$.read(Utils.scala:375)
          at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
          at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
          at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
          at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
          at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
          at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
          at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
          at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
          at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
          at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
          at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
          at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
          at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
          at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
          at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)

          In both cases, at least the stack trace is not useful.

          Show
          Jun Rao added a comment - Some examples of excessive logging. 2014/03/26 00:56:23.605 ERROR [KafkaApis] [kafka-request-handler-12] [kafka-server] [] [KafkaApi-512] Error while fetching metadata for partition [xxx,4] kafka.common.ReplicaNotAvailableException at kafka.server.KafkaApis$$anonfun$17$$anonfun$20.apply(KafkaApis.scala:552) at kafka.server.KafkaApis$$anonfun$17$$anonfun$20.apply(KafkaApis.scala:537) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.immutable.List.map(List.scala:45) at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:537) at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:533) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:123) at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322) at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322) at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.immutable.HashSet.map(HashSet.scala:32) at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:533) 2014/03/26 07:42:01.845 ERROR [ReplicaFetcherThread] [ReplicaFetcherThread-4-516] [kafka-server] [] [ReplicaFetcherThread-4-516] , Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 84 2728; ClientId: ReplicaFetcherThread-4-516; ReplicaId: 512; MaxWait: 0 ms; MinBytes: 1 bytes; RequestInfo: [ xxx ] java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) at kafka.utils.Utils$.read(Utils.scala:375) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) In both cases, at least the stack trace is not useful.
          Hide
          Ivan Lyutov added a comment -

          Created reviewboard https://reviews.apache.org/r/20030/
          against branch apache/trunk

          Show
          Ivan Lyutov added a comment - Created reviewboard https://reviews.apache.org/r/20030/ against branch apache/trunk
          Hide
          Neha Narkhede added a comment -

          Ivan Lyutov Thanks for the patch! I think it will be easier to review if we also attach the server.log and controller.log for each of the following cases -
          1. Logs during broker restart with no topics in the cluster
          2. Logs during during topic creation
          3. Logs during send/receive on a list of topics
          4. Logs during leader failover
          5. Logs during controller failover

          Show
          Neha Narkhede added a comment - Ivan Lyutov Thanks for the patch! I think it will be easier to review if we also attach the server.log and controller.log for each of the following cases - 1. Logs during broker restart with no topics in the cluster 2. Logs during during topic creation 3. Logs during send/receive on a list of topics 4. Logs during leader failover 5. Logs during controller failover
          Hide
          Ivan Lyutov added a comment -

          Updated reviewboard https://reviews.apache.org/r/20030/
          against branch apache/trunk

          Show
          Ivan Lyutov added a comment - Updated reviewboard https://reviews.apache.org/r/20030/ against branch apache/trunk
          Hide
          Jun Rao added a comment -

          Thanks for the patch. +1 and committed to trunk after rebasing.

          Show
          Jun Rao added a comment - Thanks for the patch. +1 and committed to trunk after rebasing.
          Hide
          Guozhang Wang added a comment -

          Created reviewboard https://reviews.apache.org/r/20240/
          against branch origin/trunk

          Show
          Guozhang Wang added a comment - Created reviewboard https://reviews.apache.org/r/20240/ against branch origin/trunk
          Hide
          Joel Koshy added a comment -

          Since there is a follow-up review we should either keep this open or file a separate follow-up jira. (It's extremely easy to lose track of rb's if there is no corresponding open ticket).

          Show
          Joel Koshy added a comment - Since there is a follow-up review we should either keep this open or file a separate follow-up jira. (It's extremely easy to lose track of rb's if there is no corresponding open ticket).
          Hide
          Neha Narkhede added a comment -

          Guozhang Wang I think it will be easier to review if we also attach the server.log and controller.log for each of the following cases -
          1. Logs during broker restart with no topics in the cluster
          2. Logs during during topic creation
          3. Logs during send/receive on a list of topics
          4. Logs during leader failover
          5. Logs during controller failover

          Show
          Neha Narkhede added a comment - Guozhang Wang I think it will be easier to review if we also attach the server.log and controller.log for each of the following cases - 1. Logs during broker restart with no topics in the cluster 2. Logs during during topic creation 3. Logs during send/receive on a list of topics 4. Logs during leader failover 5. Logs during controller failover
          Hide
          Joel Koshy added a comment -

          +1 on having example output to help review. It would be useful to have a
          before/after view to see the effect of these changes. Same for the
          suggestions you have made with regards to coding conventions for exception
          handling. If we all agree on those then we can incorporate it on
          http://kafka.apache.org/coding-guide.html

          Show
          Joel Koshy added a comment - +1 on having example output to help review. It would be useful to have a before/after view to see the effect of these changes. Same for the suggestions you have made with regards to coding conventions for exception handling. If we all agree on those then we can incorporate it on http://kafka.apache.org/coding-guide.html
          Hide
          Jun Rao added a comment -

          We should probably log those in a single line.
          2014/04/11 21:05:32.481 INFO [LogCleaner] [kafka-log-cleaner-thread-0] [kafka-server] [] [kafka-log-cleaner-thread-0],
          Log cleaner thread 0 cleaned log xxx-7 (dirty section = [4820664, 8801949])
          3,137.9 MB of log processed in 35.3 seconds (88.9 MB/sec).
          Indexed 3,137.9 MB in 29.2 seconds (107.5 Mb/sec, 82.7% of total time)
          Buffer utilization: 9.1%
          Cleaned 3,137.9 MB in 6.1 seconds (512.6 Mb/sec, 17.3% of total time)
          Start size: 3,137.9 MB (3,981,285 messages)
          End size: 135.9 MB (1,978,678 messages)
          95.7% size reduction (50.3% fewer messages)

          2014/04/11 21:05:32.487 INFO [LogCleaner] [kafka-log-cleaner-thread-0] [kafka-server] [] Cleaner 0: Beginning cleaning of log xxx-37.

          Show
          Jun Rao added a comment - We should probably log those in a single line. 2014/04/11 21:05:32.481 INFO [LogCleaner] [kafka-log-cleaner-thread-0] [kafka-server] [] [kafka-log-cleaner-thread-0] , Log cleaner thread 0 cleaned log xxx-7 (dirty section = [4820664, 8801949] ) 3,137.9 MB of log processed in 35.3 seconds (88.9 MB/sec). Indexed 3,137.9 MB in 29.2 seconds (107.5 Mb/sec, 82.7% of total time) Buffer utilization: 9.1% Cleaned 3,137.9 MB in 6.1 seconds (512.6 Mb/sec, 17.3% of total time) Start size: 3,137.9 MB (3,981,285 messages) End size: 135.9 MB (1,978,678 messages) 95.7% size reduction (50.3% fewer messages) 2014/04/11 21:05:32.487 INFO [LogCleaner] [kafka-log-cleaner-thread-0] [kafka-server] [] Cleaner 0: Beginning cleaning of log xxx-37.
          Hide
          Joel Koshy added a comment -

          In this specific case given that there are a few fields I think it is reasonable to have it on a single line but we should also have some coding guideline on what is too much to have on a single line. E.g., see KAFKA-1122
          i.e., I think there are cases where we probably should split (even if it means a harder grep). I wonder if log4j or at least in its latest version has the option of splitting large messages but putting the same timestamp on them.

          Show
          Joel Koshy added a comment - In this specific case given that there are a few fields I think it is reasonable to have it on a single line but we should also have some coding guideline on what is too much to have on a single line. E.g., see KAFKA-1122 i.e., I think there are cases where we probably should split (even if it means a harder grep). I wonder if log4j or at least in its latest version has the option of splitting large messages but putting the same timestamp on them.
          Hide
          Guozhang Wang added a comment - - edited

          The proposed RB was mainly on reducing redundant connection exceptions logs on both server and producer with one peer server in the cluster is offline. I also did the experiments Neha suggested, and here are the results (log4j level set to INFO):

          1. Broker restart with no topics in the cluster:

          Server:

          [2014-04-11 15:17:41,450] INFO Verifying properties (kafka.utils.VerifiableProperties)
          [2014-04-11 15:17:41,490] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)
          [2014-04-11 15:17:41,490] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)
          [2014-04-11 15:17:41,490] INFO Property log.dirs is overridden to /tmp/kafka-logs (kafka.utils.VerifiableProperties)
          [2014-04-11 15:17:41,490] INFO Property log.retention.check.interval.ms is overridden to 60000 (kafka.utils.VerifiableProperties)
          [2014-04-11 15:17:41,490] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)
          [2014-04-11 15:17:41,491] INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)
          [2014-04-11 15:17:41,491] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
          [2014-04-11 15:17:41,491] INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)
          [2014-04-11 15:17:41,491] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)
          [2014-04-11 15:17:41,491] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)
          [2014-04-11 15:17:41,491] INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
          [2014-04-11 15:17:41,492] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)
          [2014-04-11 15:17:41,492] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
          [2014-04-11 15:17:41,492] INFO Property zookeeper.connect is overridden to localhost:2181 (kafka.utils.VerifiableProperties)
          [2014-04-11 15:17:41,492] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties)
          [2014-04-11 15:17:41,507] INFO [Kafka Server 0], starting (kafka.server.KafkaServer)
          [2014-04-11 15:17:41,509] INFO [Kafka Server 0], Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
          [2014-04-11 15:17:41,520] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
          [2014-04-11 15:17:41,526] INFO Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.ZooKeeper)
          [2014-04-11 15:17:41,526] INFO Client environment:host.name=guwang-ld2.linkedin.biz (org.apache.zookeeper.ZooKeeper)
          [2014-04-11 15:17:41,526] INFO Client environment:java.version=1.6.0_38-ea (org.apache.zookeeper.ZooKeeper)
          [2014-04-11 15:17:41,526] INFO Client environment:java.vendor=Sun Microsystems Inc. (org.apache.zookeeper.ZooKeeper)
          [2014-04-11 15:17:41,526] INFO Client environment:java.home=/usr/java/jdk1.6.0_38/jre (org.apache.zookeeper.ZooKeeper)
          [2014-04-11 15:17:41,526] INFO Client environment:java.class.path=:/home/guwang/Workspace/apache/kafka/bin/../core/build/dependant-libs-2.8.0/jopt-simple-3.2.jar:/home/guwang/Workspace/apache/kafka/bin/../core/build/dependant-libs-2.8.0/log4j-1.2.15.jar:/home/guwang/Workspace/apache/kafka/bin/../core/build/dependant-libs-2.8.0/metrics-core-2.2.0.jar:/home/guwang/Workspace/apache/kafka/bin/../core/build/dependant-libs-2.8.0/scala-library-2.8.0.jar:/home/guwang/Workspace/apache/kafka/bin/../core/build/dependant-libs-2.8.0/slf4j-api-1.7.6.jar:/home/guwang/Workspace/apache/kafka/bin/../core/build/dependant-libs-2.8.0/slf4j-log4j12-1.7.6.jar:/home/guwang/Workspace/apache/kafka/bin/../core/build/dependant-libs-2.8.0/snappy-java-1.0.5.jar:/home/guwang/Workspace/apache/kafka/bin/../core/build/dependant-libs-2.8.0/zkclient-0.3.jar:/home/guwang/Workspace/apache/kafka/bin/../core/build/dependant-libs-2.8.0/zookeeper-3.3.4.jar:/home/guwang/Workspace/apache/kafka/bin/../perf/build/libs//kafka-perf_2.8.0-0.8.1.jar:/home/guwang/Workspace/apache/kafka/bin/../examples/build/libs//kafka-examples-0.8.1.jar:/home/guwang/Workspace/apache/kafka/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer-0.8.1.jar:/home/guwang/Workspace/apache/kafka/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer-0.8.1.jar:/home/guwang/Workspace/apache/kafka/bin/../clients/build/libs/kafka-clients-0.8.1.jar:/home/guwang/Workspace/apache/kafka/bin/../libs/*.jar:/home/guwang/Workspace/apache/kafka/bin/../core/build/libs/kafka_2.8.0-0.8.1.jar (org.apache.zookeeper.ZooKeeper)
          [2014-04-11 15:17:41,526] INFO Client environment:java.library.path=/usr/java/jdk1.6.0_38/jre/lib/amd64/server:/usr/java/jdk1.6.0_38/jre/lib/amd64:/usr/java/jdk1.6.0_38/jre/../lib/amd64:/export/apps/xtools/oracle-instant-client_10.2:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
          [2014-04-11 15:17:41,526] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
          [2014-04-11 15:17:41,526] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
          [2014-04-11 15:17:41,526] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
          [2014-04-11 15:17:41,527] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
          [2014-04-11 15:17:41,527] INFO Client environment:os.version=2.6.32-131.4.1.el6.x86_64 (org.apache.zookeeper.ZooKeeper)
          [2014-04-11 15:17:41,527] INFO Client environment:user.name=guwang (org.apache.zookeeper.ZooKeeper)
          [2014-04-11 15:17:41,527] INFO Client environment:user.home=/home/guwang (org.apache.zookeeper.ZooKeeper)
          [2014-04-11 15:17:41,527] INFO Client environment:user.dir=/home/guwang/Workspace/apache/kafka (org.apache.zookeeper.ZooKeeper)
          [2014-04-11 15:17:41,527] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@6a6779e6 (org.apache.zookeeper.ZooKeeper)
          [2014-04-11 15:17:41,546] INFO Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181 (org.apache.zookeeper.ClientCnxn)
          [2014-04-11 15:17:41,551] INFO Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
          [2014-04-11 15:17:41,562] INFO Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x14552dde3620001, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
          [2014-04-11 15:17:41,565] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
          [2014-04-11 15:17:41,647] INFO Found clean shutdown file. Skipping recovery for all logs in data directory '/tmp/kafka-logs' (kafka.log.LogManager)
          [2014-04-11 15:17:41,648] INFO Starting log cleanup with a period of 60000 ms. (kafka.log.LogManager)
          [2014-04-11 15:17:41,652] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
          [2014-04-11 15:17:41,694] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
          [2014-04-11 15:17:41,695] INFO [Socket Server on Broker 0], Started (kafka.network.SocketServer)
          [2014-04-11 15:17:41,793] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
          [2014-04-11 15:17:41,840] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
          [2014-04-11 15:17:41,955] INFO Registered broker 0 at path /brokers/ids/0 with address guwang-ld2.linkedin.biz:9092. (kafka.utils.ZkUtils$)
          [2014-04-11 15:17:41,983] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
          [2014-04-11 15:17:42,039] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)

          Controller:

          [2014-04-11 15:17:24,520] DEBUG preRegister called. Server=com.sun.jmx.mbeanserver.JmxMBeanServer@7aab853b, name=log4j:logger=kafka.controller (kafka.controller)
          [2014-04-11 15:17:30,898] INFO [Controller 0]: Controller starting up (kafka.controller.KafkaController)
          [2014-04-11 15:17:30,932] INFO [Controller 0]: Broker 0 starting become controller state transition (kafka.controller.KafkaController)
          [2014-04-11 15:17:30,946] INFO [Controller 0]: Controller 0 incremented epoch to 1 (kafka.controller.KafkaController)
          [2014-04-11 15:17:30,982] INFO [Controller 0]: Partitions undergoing preferred replica election: (kafka.controller.KafkaController)
          [2014-04-11 15:17:30,983] INFO [Controller 0]: Partitions that completed preferred replica election: (kafka.controller.KafkaController)
          [2014-04-11 15:17:30,983] INFO [Controller 0]: Resuming preferred replica election for partitions: (kafka.controller.KafkaController)
          [2014-04-11 15:17:30,988] INFO [Controller 0]: Partitions being reassigned: Map() (kafka.controller.KafkaController)
          [2014-04-11 15:17:30,988] INFO [Controller 0]: Partitions already reassigned: List() (kafka.controller.KafkaController)
          [2014-04-11 15:17:30,989] INFO [Controller 0]: Resuming reassignment of partitions: Map() (kafka.controller.KafkaController)
          [2014-04-11 15:17:30,992] INFO [Controller 0]: List of topics to be deleted: (kafka.controller.KafkaController)
          [2014-04-11 15:17:30,992] INFO [Controller 0]: List of topics ineligible for deletion: (kafka.controller.KafkaController)
          [2014-04-11 15:17:30,996] INFO [Controller 0]: Currently active brokers in the cluster: Set() (kafka.controller.KafkaController)
          [2014-04-11 15:17:30,997] INFO [Controller 0]: Currently shutting brokers in the cluster: Set() (kafka.controller.KafkaController)
          [2014-04-11 15:17:30,997] INFO [Controller 0]: Current list of topics in the cluster: Set() (kafka.controller.KafkaController)
          [2014-04-11 15:17:31,000] INFO [Replica state machine on controller 0]: Started replica state machine with initial state -> Map() (kafka.controller.ReplicaStateMachine)
          [2014-04-11 15:17:31,005] INFO [Partition state machine on Controller 0]: Started partition state machine with initial state -> Map() (kafka.controller.PartitionStateMachine)
          [2014-04-11 15:17:31,006] INFO [Controller 0]: Broker 0 is ready to serve as the new controller with epoch 1 (kafka.controller.KafkaController)
          [2014-04-11 15:17:31,007] INFO [Controller 0]: Starting preferred replica leader election for partitions (kafka.controller.KafkaController)
          [2014-04-11 15:17:31,008] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions (kafka.controller.PartitionStateMachine)
          [2014-04-11 15:17:31,019] INFO [Controller 0]: Controller startup complete (kafka.controller.KafkaController)
          [2014-04-11 15:17:31,133] DEBUG [ControllerEpochListener on 0]: Controller epoch listener fired with new epoch 1 (kafka.controller.ControllerEpochListener)
          [2014-04-11 15:17:31,144] INFO [ControllerEpochListener on 0]: Initialized controller epoch to 1 and zk version 0 (kafka.controller.ControllerEpochListener)
          [2014-04-11 15:17:31,148] INFO [BrokerChangeListener on Controller 0]: Broker change listener fired for path /brokers/ids with children 0 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
          [2014-04-11 15:17:31,203] INFO [BrokerChangeListener on Controller 0]: Newly added brokers: 0, deleted brokers: , all live brokers: 0 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
          [2014-04-11 15:17:31,205] DEBUG [Channel manager on controller 0]: Controller 0 trying to connect to broker 0 (kafka.controller.ControllerChannelManager)
          [2014-04-11 15:17:31,210] INFO [Controller-0-to-broker-0-send-thread], Controller 0 connected to id:0,host:guwang-ld2.linkedin.biz,port:9092 for sending state change requests (kafka.controller.RequestSendThread)
          [2014-04-11 15:17:31,212] INFO [Controller-0-to-broker-0-send-thread], Starting (kafka.controller.RequestSendThread)
          [2014-04-11 15:17:31,212] INFO [Controller 0]: New broker startup callback for 0 (kafka.controller.KafkaController)
          [2014-04-11 15:17:36,049] INFO [BrokerChangeListener on Controller 0]: Broker change listener fired for path /brokers/ids with children (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
          [2014-04-11 15:17:36,050] INFO [BrokerChangeListener on Controller 0]: Newly added brokers: , deleted brokers: 0, all live brokers: (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
          [2014-04-11 15:17:36,059] INFO [Controller-0-to-broker-0-send-thread], Shutting down (kafka.controller.RequestSendThread)
          [2014-04-11 15:17:36,060] INFO [Controller-0-to-broker-0-send-thread], Shutdown completed (kafka.controller.RequestSendThread)
          [2014-04-11 15:17:36,061] INFO [Controller-0-to-broker-0-send-thread], Stopped (kafka.controller.RequestSendThread)
          [2014-04-11 15:17:36,061] INFO [Controller 0]: Broker failure callback for 0 (kafka.controller.KafkaController)
          [2014-04-11 15:17:36,064] INFO [Controller 0]: Removed List() from list of shutting down brokers. (kafka.controller.KafkaController)
          [2014-04-11 15:17:36,067] INFO [Partition state machine on Controller 0]: Invoking state change to OfflinePartition for partitions (kafka.controller.PartitionStateMachine)
          [2014-04-11 15:17:41,755] INFO [ControllerEpochListener on 0]: Initialized controller epoch to 1 and zk version 0 (kafka.controller.ControllerEpochListener)
          [2014-04-11 15:17:41,795] INFO [Controller 0]: Controller starting up (kafka.controller.KafkaController)
          [2014-04-11 15:17:41,841] INFO [Controller 0]: Broker 0 starting become controller state transition (kafka.controller.KafkaController)
          [2014-04-11 15:17:41,852] INFO [Controller 0]: Controller 0 incremented epoch to 2 (kafka.controller.KafkaController)
          [2014-04-11 15:17:41,884] INFO [Controller 0]: Partitions undergoing preferred replica election: (kafka.controller.KafkaController)
          [2014-04-11 15:17:41,885] INFO [Controller 0]: Partitions that completed preferred replica election: (kafka.controller.KafkaController)
          [2014-04-11 15:17:41,885] INFO [Controller 0]: Resuming preferred replica election for partitions: (kafka.controller.KafkaController)
          [2014-04-11 15:17:41,889] INFO [Controller 0]: Partitions being reassigned: Map() (kafka.controller.KafkaController)
          [2014-04-11 15:17:41,890] INFO [Controller 0]: Partitions already reassigned: List() (kafka.controller.KafkaController)
          [2014-04-11 15:17:41,891] INFO [Controller 0]: Resuming reassignment of partitions: Map() (kafka.controller.KafkaController)
          [2014-04-11 15:17:41,894] INFO [Controller 0]: List of topics to be deleted: (kafka.controller.KafkaController)
          [2014-04-11 15:17:41,894] INFO [Controller 0]: List of topics ineligible for deletion: (kafka.controller.KafkaController)
          [2014-04-11 15:17:41,898] INFO [Controller 0]: Currently active brokers in the cluster: Set() (kafka.controller.KafkaController)
          [2014-04-11 15:17:41,898] INFO [Controller 0]: Currently shutting brokers in the cluster: Set() (kafka.controller.KafkaController)
          [2014-04-11 15:17:41,899] INFO [Controller 0]: Current list of topics in the cluster: Set() (kafka.controller.KafkaController)
          [2014-04-11 15:17:41,902] INFO [Replica state machine on controller 0]: Started replica state machine with initial state -> Map() (kafka.controller.ReplicaStateMachine)
          [2014-04-11 15:17:41,906] INFO [Partition state machine on Controller 0]: Started partition state machine with initial state -> Map() (kafka.controller.PartitionStateMachine)
          [2014-04-11 15:17:41,908] INFO [Controller 0]: Broker 0 is ready to serve as the new controller with epoch 2 (kafka.controller.KafkaController)
          [2014-04-11 15:17:41,909] INFO [Controller 0]: Starting preferred replica leader election for partitions (kafka.controller.KafkaController)
          [2014-04-11 15:17:41,910] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions (kafka.controller.PartitionStateMachine)
          [2014-04-11 15:17:41,920] INFO [Controller 0]: Controller startup complete (kafka.controller.KafkaController)
          [2014-04-11 15:17:42,042] DEBUG [ControllerEpochListener on 0]: Controller epoch listener fired with new epoch 2 (kafka.controller.ControllerEpochListener)
          [2014-04-11 15:17:42,045] INFO [ControllerEpochListener on 0]: Initialized controller epoch to 2 and zk version 1 (kafka.controller.ControllerEpochListener)
          [2014-04-11 15:17:42,049] INFO [BrokerChangeListener on Controller 0]: Broker change listener fired for path /brokers/ids with children 0 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
          [2014-04-11 15:17:42,103] INFO [BrokerChangeListener on Controller 0]: Newly added brokers: 0, deleted brokers: , all live brokers: 0 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
          [2014-04-11 15:17:42,104] DEBUG [Channel manager on controller 0]: Controller 0 trying to connect to broker 0 (kafka.controller.ControllerChannelManager)
          [2014-04-11 15:17:42,109] INFO [Controller-0-to-broker-0-send-thread], Controller 0 connected to id:0,host:guwang-ld2.linkedin.biz,port:9092 for sending state change requests (kafka.controller.RequestSendThread)
          [2014-04-11 15:17:42,111] INFO [Controller-0-to-broker-0-send-thread], Starting (kafka.controller.RequestSendThread)
          [2014-04-11 15:17:42,111] INFO [Controller 0]: New broker startup callback for 0 (kafka.controller.KafkaController)

          2. Topic creation

          Server:

          [2014-04-11 15:19:46,428] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
          [2014-04-11 15:19:46,455] INFO Completed load of log test-0 with log end offset 0 (kafka.log.Log)
          [2014-04-11 15:19:46,460] INFO Created log for partition [test,0] in /tmp/kafka-logs with properties

          Unknown macro: {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, unclean.leader.election.enable -> true, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}

          . (kafka.log.LogManager)
          [2014-04-11 15:19:46,461] WARN Partition [test,0] on broker 0: No checkpointed highwatermark is found for partition [test,0] (kafka.cluster.Partition)

          State-change:

          [2014-04-11 15:17:24,517] DEBUG preRegister called. Server=com.sun.jmx.mbeanserver.JmxMBeanServer@7aab853b, name=log4j:logger=state.change.logger (state.change.logger)
          [2014-04-11 15:19:46,327] TRACE Controller 0 epoch 2 changed partition [test,0] state from NonExistentPartition to NewPartition with assigned replicas 0 (state.change.logger)
          [2014-04-11 15:19:46,333] TRACE Controller 0 epoch 2 changed state of replica 0 for partition [test,0] from NonExistentReplica to NewReplica (state.change.logger)
          [2014-04-11 15:19:46,365] TRACE Controller 0 epoch 2 changed partition [test,0] from NewPartition to OnlinePartition with leader 0 (state.change.logger)
          [2014-04-11 15:19:46,374] TRACE Controller 0 epoch 2 sending become-leader LeaderAndIsr request (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) with correlationId 7 to broker 0 for partition [test,0] (state.change.logger)
          [2014-04-11 15:19:46,377] TRACE Controller 0 epoch 2 sending UpdateMetadata request (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) with correlationId 7 to broker 0 for partition [test,0] (state.change.logger)
          [2014-04-11 15:19:46,380] TRACE Controller 0 epoch 2 changed state of replica 0 for partition [test,0] from NewReplica to OnlineReplica (state.change.logger)
          [2014-04-11 15:19:46,408] TRACE Broker 0 received LeaderAndIsr request (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0) correlation id 7 from controller 0 epoch 2 for partition [test,0] (state.change.logger)
          [2014-04-11 15:19:46,418] TRACE Broker 0 handling LeaderAndIsr request correlationId 7 from controller 0 epoch 2 starting the become-leader transition for partition [test,0] (state.change.logger)
          [2014-04-11 15:19:46,429] TRACE Broker 0 stopped fetchers as part of become-leader request from controller 0 epoch 2 with correlation id 7 for partition [test,0] (state.change.logger)
          [2014-04-11 15:19:46,473] TRACE Broker 0 completed LeaderAndIsr request correlationId 7 from controller 0 epoch 2 for the become-leader transition for partition [test,0] (state.change.logger)
          [2014-04-11 15:19:46,486] TRACE Controller 0 epoch 2 received response correlationId 7 for a request sent to broker id:0,host:guwang-ld2.linkedin.biz,port:9092 (state.change.logger)
          [2014-04-11 15:19:46,509] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0) for partition [test,0] in response to UpdateMetadata request sent by controller 0 epoch 2 with correlation id 7 (state.change.logger)
          [2014-04-11 15:19:46,511] TRACE Controller 0 epoch 2 received response correlationId 7 for a request sent to broker id:0,host:guwang-ld2.linkedin.biz,port:9092 (state.change.logger)

          Controller:

          [2014-04-11 15:19:46,294] DEBUG [TopicChangeListener on Controller 0]: Topic change listener fired for path /brokers/topics with children test (kafka.controller.PartitionStateMachine$TopicChangeListener)
          [2014-04-11 15:19:46,312] INFO [TopicChangeListener on Controller 0]: New topics: [Set(test)], deleted topics: [Set()], new partition replica assignment [Map([test,0] -> List(0))] (kafka.controller.PartitionStateMachine$TopicChangeListener)
          [2014-04-11 15:19:46,313] INFO [Controller 0]: New topic creation callback for [test,0] (kafka.controller.KafkaController)
          [2014-04-11 15:19:46,316] INFO [Controller 0]: New partition creation callback for [test,0] (kafka.controller.KafkaController)
          [2014-04-11 15:19:46,317] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [test,0] (kafka.controller.PartitionStateMachine)
          [2014-04-11 15:19:46,330] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=test,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine)
          [2014-04-11 15:19:46,333] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [test,0] (kafka.controller.PartitionStateMachine)
          [2014-04-11 15:19:46,335] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [test,0] are: [List(0)] (kafka.controller.PartitionStateMachine)
          [2014-04-11 15:19:46,337] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [test,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine)
          [2014-04-11 15:19:46,377] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=test,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine)

          3. Send messages: nearly no logs except one in kafka-request.

          4. Receive messages: nearly no logs in kafka-request and

          [2014-04-11 16:59:12,262] ERROR Closing socket for /127.0.0.1 because of error java.io.IOException: Connection reset by peer (kafka.network.Processor)

          When consumer closed.

          5. Server shutdown:

          Shutting-down Server:

          [2014-04-11 17:04:14,041] INFO [Kafka Server 0], shutting down (kafka.server.KafkaServer)
          [2014-04-11 17:04:14,054] INFO Deregistered broker 0 at path /brokers/ids/0. (kafka.utils.ZkUtils$)
          [2014-04-11 17:04:14,056] INFO [Socket Server on Broker 0], Shutting down (kafka.network.SocketServer)
          [2014-04-11 17:04:14,061] INFO [Socket Server on Broker 0], Shutdown completed (kafka.network.SocketServer)
          [2014-04-11 17:04:14,062] INFO [Kafka Request Handler on Broker 0], shutting down (kafka.server.KafkaRequestHandlerPool)
          [2014-04-11 17:04:14,066] INFO [Kafka Request Handler on Broker 0], shut down completely (kafka.server.KafkaRequestHandlerPool)
          [2014-04-11 17:04:14,256] INFO [Replica Manager on Broker 0]: Shut down (kafka.server.ReplicaManager)
          [2014-04-11 17:04:14,256] INFO [ReplicaFetcherManager on broker 0] shutting down (kafka.server.ReplicaFetcherManager)
          [2014-04-11 17:04:14,258] INFO [ReplicaFetcherManager on broker 0] shutdown completed (kafka.server.ReplicaFetcherManager)
          [2014-04-11 17:04:14,258] INFO [Replica Manager on Broker 0]: Shut down completely (kafka.server.ReplicaManager)
          [2014-04-11 17:04:14,259] INFO Shutting down. (kafka.log.LogManager)
          [2014-04-11 17:04:14,278] INFO Shutdown complete. (kafka.log.LogManager)
          [2014-04-11 17:04:14,280] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
          [2014-04-11 17:04:14,289] INFO Session: 0x14552dde3620001 closed (org.apache.zookeeper.ZooKeeper)
          [2014-04-11 17:04:14,289] INFO EventThread shut down (org.apache.zookeeper.ClientCnxn)
          [2014-04-11 17:04:14,290] INFO [Kafka Server 0], shut down completed (kafka.server.KafkaServer)

          Remaining Server:

          [2014-04-11 17:04:14,280] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
          [2014-04-11 17:04:14,301] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
          [2014-04-11 17:04:14,469] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)

          Controller:

          [2014-04-11 17:04:14,055] INFO [BrokerChangeListener on Controller 0]: Broker change listener fired for path /brokers/ids with children 1 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
          [2014-04-11 17:04:14,079] INFO [BrokerChangeListener on Controller 0]: Newly added brokers: , deleted brokers: 0, all live brokers: 1 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
          [2014-04-11 17:04:14,081] INFO [Controller-0-to-broker-0-send-thread], Shutting down (kafka.controller.RequestSendThread)
          [2014-04-11 17:04:14,082] INFO [Controller-0-to-broker-0-send-thread], Stopped (kafka.controller.RequestSendThread)
          [2014-04-11 17:04:14,082] INFO [Controller-0-to-broker-0-send-thread], Shutdown completed (kafka.controller.RequestSendThread)
          [2014-04-11 17:04:14,083] INFO [Controller 0]: Broker failure callback for 0 (kafka.controller.KafkaController)
          [2014-04-11 17:04:14,083] INFO [Controller 0]: Removed List() from list of shutting down brokers. (kafka.controller.KafkaController)
          [2014-04-11 17:04:14,084] INFO [Partition state machine on Controller 0]: Invoking state change to OfflinePartition for partitions [test,0] (kafka.controller.PartitionStateMachine)
          [2014-04-11 17:04:14,116] DEBUG [OfflinePartitionLeaderSelector]: No broker in ISR is alive for [test,0]. Pick the leader from the alive assigned replicas: (kafka.controller.OfflinePartitionLeaderSelector)
          [2014-04-11 17:04:14,120] INFO [Replica state machine on controller 0]: Invoking state change to OfflineReplica for replicas [Topic=test,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine)
          [2014-04-11 17:04:14,124] DEBUG [Controller 0]: Removing replica 0 from ISR 0 for partition [test,0]. (kafka.controller.KafkaController)
          [2014-04-11 17:04:14,146] INFO [Controller 0]: New leader and ISR for partition [test,0] is

          Unknown macro: {"leader"}

          (kafka.controller.KafkaController)
          [2014-04-11 17:04:14,152] DEBUG The stop replica request (delete = true) sent to broker 0 is (kafka.controller.ControllerBrokerRequestBatch)
          [2014-04-11 17:04:14,153] DEBUG The stop replica request (delete = false) sent to broker 0 is [Topic=test,Partition=0,Replica=0] (kafka.controller.ControllerBrokerRequestBatch)
          [2014-04-11 17:04:14,158] WARN [Channel manager on controller 0]: Not sending request Name: StopReplicaRequest; Version: 0; CorrelationId: 13; ClientId: ; DeletePartitions: false; ControllerId: 0; ControllerEpoch: 2; Partitions: [test,0] to broker 0, since it is offline. (kafka.controller.ControllerChannelManager)
          [2014-04-11 17:04:14,279] INFO [Controller-0-to-broker-1-send-thread], Shutting down (kafka.controller.RequestSendThread)
          [2014-04-11 17:04:14,280] INFO [Controller-0-to-broker-1-send-thread], Stopped (kafka.controller.RequestSendThread)
          [2014-04-11 17:04:14,280] INFO [Controller-0-to-broker-1-send-thread], Shutdown completed (kafka.controller.RequestSendThread)

          5. Controlled server shutdown:

          Shutting down server:

          [2014-04-11 17:14:37,204] INFO [Kafka Server 0], shutting down (kafka.server.KafkaServer)
          [2014-04-11 17:14:37,206] INFO [Kafka Server 0], Starting controlled shutdown (kafka.server.KafkaServer)
          [2014-04-11 17:14:37,259] INFO [Kafka Server 0], Controlled shutdown succeeded (kafka.server.KafkaServer)
          [2014-04-11 17:14:37,259] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
          [2014-04-11 17:14:37,270] INFO Deregistered broker 0 at path /brokers/ids/0. (kafka.utils.ZkUtils$)
          [2014-04-11 17:14:37,271] INFO [Socket Server on Broker 0], Shutting down (kafka.network.SocketServer)
          [2014-04-11 17:14:37,277] INFO [Socket Server on Broker 0], Shutdown completed (kafka.network.SocketServer)
          [2014-04-11 17:14:37,279] INFO [Kafka Request Handler on Broker 0], shutting down (kafka.server.KafkaRequestHandlerPool)
          [2014-04-11 17:14:37,281] INFO [Kafka Request Handler on Broker 0], shut down completely (kafka.server.KafkaRequestHandlerPool)
          [2014-04-11 17:14:37,679] INFO [Replica Manager on Broker 0]: Shut down (kafka.server.ReplicaManager)
          [2014-04-11 17:14:37,680] INFO [ReplicaFetcherManager on broker 0] shutting down (kafka.server.ReplicaFetcherManager)
          [2014-04-11 17:14:37,681] INFO [ReplicaFetcherManager on broker 0] shutdown completed (kafka.server.ReplicaFetcherManager)
          [2014-04-11 17:14:37,683] INFO [Replica Manager on Broker 0]: Shut down completely (kafka.server.ReplicaManager)
          [2014-04-11 17:14:37,684] INFO Shutting down. (kafka.log.LogManager)
          [2014-04-11 17:14:37,692] INFO Shutdown complete. (kafka.log.LogManager)
          [2014-04-11 17:14:37,696] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
          [2014-04-11 17:14:37,704] INFO Session: 0x14552dde3620007 closed (org.apache.zookeeper.ZooKeeper)
          [2014-04-11 17:14:37,704] INFO EventThread shut down (org.apache.zookeeper.ClientCnxn)
          [2014-04-11 17:14:37,704] INFO [Kafka Server 0], shut down completed (kafka.server.KafkaServer)

          Controller failover:

          [2014-04-11 17:14:37,231] INFO [Controller 0]: Shutting down broker 0 (kafka.controller.KafkaController)
          [2014-04-11 17:14:37,232] DEBUG [Controller 0]: All shutting down brokers: 0 (kafka.controller.KafkaController)
          [2014-04-11 17:14:37,232] DEBUG [Controller 0]: Live brokers: 1 (kafka.controller.KafkaController)
          [2014-04-11 17:14:37,236] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [test,0] (kafka.controller.PartitionStateMachine)
          [2014-04-11 17:14:37,255] TRACE [Controller 0]: All leaders = [test,0] -> (Leader:0,ISR:0,LeaderEpoch:2,ControllerEpoch:3) (kafka.controller.KafkaController)
          [2014-04-11 17:14:37,271] INFO [BrokerChangeListener on Controller 0]: Broker change listener fired for path /brokers/ids with children 1 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
          [2014-04-11 17:14:37,285] INFO [BrokerChangeListener on Controller 0]: Newly added brokers: , deleted brokers: 0, all live brokers: 1 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
          [2014-04-11 17:14:37,286] INFO [Controller-0-to-broker-0-send-thread], Shutting down (kafka.controller.RequestSendThread)
          [2014-04-11 17:14:37,287] INFO [Controller-0-to-broker-0-send-thread], Stopped (kafka.controller.RequestSendThread)
          [2014-04-11 17:14:37,287] INFO [Controller-0-to-broker-0-send-thread], Shutdown completed (kafka.controller.RequestSendThread)
          [2014-04-11 17:14:37,288] INFO [Controller 0]: Broker failure callback for 0 (kafka.controller.KafkaController)
          [2014-04-11 17:14:37,289] INFO [Controller 0]: Removed List(0) from list of shutting down brokers. (kafka.controller.KafkaController)
          [2014-04-11 17:14:37,290] INFO [Partition state machine on Controller 0]: Invoking state change to OfflinePartition for partitions [test,0] (kafka.controller.PartitionStateMachine)
          [2014-04-11 17:14:37,305] DEBUG [OfflinePartitionLeaderSelector]: No broker in ISR is alive for [test,0]. Pick the leader from the alive assigned replicas: (kafka.controller.OfflinePartitionLeaderSelector)
          [2014-04-11 17:14:37,308] INFO [Replica state machine on controller 0]: Invoking state change to OfflineReplica for replicas [Topic=test,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine)
          [2014-04-11 17:14:37,312] DEBUG [Controller 0]: Removing replica 0 from ISR 0 for partition [test,0]. (kafka.controller.KafkaController)
          [2014-04-11 17:14:37,325] INFO [Controller 0]: New leader and ISR for partition [test,0] is

          Unknown macro: {"leader"}

          (kafka.controller.KafkaController)
          [2014-04-11 17:14:37,329] DEBUG The stop replica request (delete = true) sent to broker 0 is (kafka.controller.ControllerBrokerRequestBatch)
          [2014-04-11 17:14:37,329] DEBUG The stop replica request (delete = false) sent to broker 0 is [Topic=test,Partition=0,Replica=0] (kafka.controller.ControllerBrokerRequestBatch)
          [2014-04-11 17:14:37,333] WARN [Channel manager on controller 0]: Not sending request Name: StopReplicaRequest; Version: 0; CorrelationId: 9; ClientId: ; DeletePartitions: false; ControllerId: 0; ControllerEpoch: 4; Partitions: [test,0] to broker 0, since it is offline. (kafka.controller.ControllerChannelManager)
          [2014-04-11 17:14:37,695] INFO [Controller-0-to-broker-1-send-thread], Shutting down (kafka.controller.RequestSendThread)
          [2014-04-11 17:14:37,695] INFO [Controller-0-to-broker-1-send-thread], Stopped (kafka.controller.RequestSendThread)
          [2014-04-11 17:14:37,695] INFO [Controller-0-to-broker-1-send-thread], Shutdown completed (kafka.controller.RequestSendThread)
          [2014-04-11 17:14:37,713] INFO [Controller 1]: Broker 1 starting become controller state transition (kafka.controller.KafkaController)
          [2014-04-11 17:14:37,721] INFO [Controller 1]: Controller 1 incremented epoch to 5 (kafka.controller.KafkaController)
          [2014-04-11 17:14:37,787] DEBUG [Channel manager on controller 1]: Controller 1 trying to connect to broker 1 (kafka.controller.ControllerChannelManager)
          [2014-04-11 17:14:37,792] INFO [Controller-1-to-broker-1-send-thread], Controller 1 connected to id:1,host:guwang-ld2.linkedin.biz,port:1999 for sending state change requests (kafka.controller.RequestSendThread)
          [2014-04-11 17:14:37,794] INFO [Controller-1-to-broker-1-send-thread], Starting (kafka.controller.RequestSendThread)
          [2014-04-11 17:14:37,796] INFO [Controller 1]: Partitions undergoing preferred replica election: (kafka.controller.KafkaController)
          [2014-04-11 17:14:37,797] INFO [Controller 1]: Partitions that completed preferred replica election: (kafka.controller.KafkaController)
          [2014-04-11 17:14:37,797] INFO [Controller 1]: Resuming preferred replica election for partitions: (kafka.controller.KafkaController)
          [2014-04-11 17:14:37,800] INFO [Controller 1]: Partitions being reassigned: Map() (kafka.controller.KafkaController)
          [2014-04-11 17:14:37,800] INFO [Controller 1]: Partitions already reassigned: List() (kafka.controller.KafkaController)
          [2014-04-11 17:14:37,801] INFO [Controller 1]: Resuming reassignment of partitions: Map() (kafka.controller.KafkaController)
          [2014-04-11 17:14:37,806] INFO [Controller 1]: List of topics to be deleted: (kafka.controller.KafkaController)
          [2014-04-11 17:14:37,806] INFO [Controller 1]: List of topics ineligible for deletion: test (kafka.controller.KafkaController)
          [2014-04-11 17:14:37,809] INFO [Controller 1]: Currently active brokers in the cluster: Set(1) (kafka.controller.KafkaController)
          [2014-04-11 17:14:37,809] INFO [Controller 1]: Currently shutting brokers in the cluster: Set() (kafka.controller.KafkaController)
          [2014-04-11 17:14:37,810] INFO [Controller 1]: Current list of topics in the cluster: Set(test) (kafka.controller.KafkaController)
          [2014-04-11 17:14:37,815] INFO [Replica state machine on controller 1]: Started replica state machine with initial state -> Map([Topic=test,Partition=0,Replica=0] -> ReplicaDeletionIneligible) (kafka.controller.ReplicaStateMachine)
          [2014-04-11 17:14:37,839] DEBUG [OfflinePartitionLeaderSelector]: No broker in ISR is alive for [test,0]. Pick the leader from the alive assigned replicas: (kafka.controller.OfflinePartitionLeaderSelector)
          [2014-04-11 17:14:37,845] INFO [Partition state machine on Controller 1]: Started partition state machine with initial state -> Map([test,0] -> OfflinePartition) (kafka.controller.PartitionStateMachine)
          [2014-04-11 17:14:37,848] INFO [Controller 1]: Broker 1 is ready to serve as the new controller with epoch 5 (kafka.controller.KafkaController)
          [2014-04-11 17:14:37,849] INFO [Controller 1]: Starting preferred replica leader election for partitions (kafka.controller.KafkaController)
          [2014-04-11 17:14:37,850] INFO [Partition state machine on Controller 1]: Invoking state change to OnlinePartition for partitions (kafka.controller.PartitionStateMachine)
          [2014-04-11 17:14:37,901] DEBUG [ControllerEpochListener on 1]: Controller epoch listener fired with new epoch 5 (kafka.controller.ControllerEpochListener)
          [2014-04-11 17:14:37,903] INFO [ControllerEpochListener on 1]: Initialized controller epoch to 5 and zk version 4 (kafka.controller.ControllerEpochListener)

          Show
          Guozhang Wang added a comment - - edited The proposed RB was mainly on reducing redundant connection exceptions logs on both server and producer with one peer server in the cluster is offline. I also did the experiments Neha suggested, and here are the results (log4j level set to INFO): 1. Broker restart with no topics in the cluster: Server: [2014-04-11 15:17:41,450] INFO Verifying properties (kafka.utils.VerifiableProperties) [2014-04-11 15:17:41,490] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties) [2014-04-11 15:17:41,490] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties) [2014-04-11 15:17:41,490] INFO Property log.dirs is overridden to /tmp/kafka-logs (kafka.utils.VerifiableProperties) [2014-04-11 15:17:41,490] INFO Property log.retention.check.interval.ms is overridden to 60000 (kafka.utils.VerifiableProperties) [2014-04-11 15:17:41,490] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties) [2014-04-11 15:17:41,491] INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties) [2014-04-11 15:17:41,491] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties) [2014-04-11 15:17:41,491] INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties) [2014-04-11 15:17:41,491] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties) [2014-04-11 15:17:41,491] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties) [2014-04-11 15:17:41,491] INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) [2014-04-11 15:17:41,492] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties) [2014-04-11 15:17:41,492] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) [2014-04-11 15:17:41,492] INFO Property zookeeper.connect is overridden to localhost:2181 (kafka.utils.VerifiableProperties) [2014-04-11 15:17:41,492] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties) [2014-04-11 15:17:41,507] INFO [Kafka Server 0] , starting (kafka.server.KafkaServer) [2014-04-11 15:17:41,509] INFO [Kafka Server 0] , Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer) [2014-04-11 15:17:41,520] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2014-04-11 15:17:41,526] INFO Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.ZooKeeper) [2014-04-11 15:17:41,526] INFO Client environment:host.name=guwang-ld2.linkedin.biz (org.apache.zookeeper.ZooKeeper) [2014-04-11 15:17:41,526] INFO Client environment:java.version=1.6.0_38-ea (org.apache.zookeeper.ZooKeeper) [2014-04-11 15:17:41,526] INFO Client environment:java.vendor=Sun Microsystems Inc. (org.apache.zookeeper.ZooKeeper) [2014-04-11 15:17:41,526] INFO Client environment:java.home=/usr/java/jdk1.6.0_38/jre (org.apache.zookeeper.ZooKeeper) [2014-04-11 15:17:41,526] INFO Client environment:java.class.path=:/home/guwang/Workspace/apache/kafka/bin/../core/build/dependant-libs-2.8.0/jopt-simple-3.2.jar:/home/guwang/Workspace/apache/kafka/bin/../core/build/dependant-libs-2.8.0/log4j-1.2.15.jar:/home/guwang/Workspace/apache/kafka/bin/../core/build/dependant-libs-2.8.0/metrics-core-2.2.0.jar:/home/guwang/Workspace/apache/kafka/bin/../core/build/dependant-libs-2.8.0/scala-library-2.8.0.jar:/home/guwang/Workspace/apache/kafka/bin/../core/build/dependant-libs-2.8.0/slf4j-api-1.7.6.jar:/home/guwang/Workspace/apache/kafka/bin/../core/build/dependant-libs-2.8.0/slf4j-log4j12-1.7.6.jar:/home/guwang/Workspace/apache/kafka/bin/../core/build/dependant-libs-2.8.0/snappy-java-1.0.5.jar:/home/guwang/Workspace/apache/kafka/bin/../core/build/dependant-libs-2.8.0/zkclient-0.3.jar:/home/guwang/Workspace/apache/kafka/bin/../core/build/dependant-libs-2.8.0/zookeeper-3.3.4.jar:/home/guwang/Workspace/apache/kafka/bin/../perf/build/libs//kafka-perf_2.8.0-0.8.1.jar:/home/guwang/Workspace/apache/kafka/bin/../examples/build/libs//kafka-examples-0.8.1.jar:/home/guwang/Workspace/apache/kafka/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer-0.8.1.jar:/home/guwang/Workspace/apache/kafka/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer-0.8.1.jar:/home/guwang/Workspace/apache/kafka/bin/../clients/build/libs/kafka-clients-0.8.1.jar:/home/guwang/Workspace/apache/kafka/bin/../libs/*.jar:/home/guwang/Workspace/apache/kafka/bin/../core/build/libs/kafka_2.8.0-0.8.1.jar (org.apache.zookeeper.ZooKeeper) [2014-04-11 15:17:41,526] INFO Client environment:java.library.path=/usr/java/jdk1.6.0_38/jre/lib/amd64/server:/usr/java/jdk1.6.0_38/jre/lib/amd64:/usr/java/jdk1.6.0_38/jre/../lib/amd64:/export/apps/xtools/oracle-instant-client_10.2:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper) [2014-04-11 15:17:41,526] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper) [2014-04-11 15:17:41,526] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper) [2014-04-11 15:17:41,526] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper) [2014-04-11 15:17:41,527] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper) [2014-04-11 15:17:41,527] INFO Client environment:os.version=2.6.32-131.4.1.el6.x86_64 (org.apache.zookeeper.ZooKeeper) [2014-04-11 15:17:41,527] INFO Client environment:user.name=guwang (org.apache.zookeeper.ZooKeeper) [2014-04-11 15:17:41,527] INFO Client environment:user.home=/home/guwang (org.apache.zookeeper.ZooKeeper) [2014-04-11 15:17:41,527] INFO Client environment:user.dir=/home/guwang/Workspace/apache/kafka (org.apache.zookeeper.ZooKeeper) [2014-04-11 15:17:41,527] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@6a6779e6 (org.apache.zookeeper.ZooKeeper) [2014-04-11 15:17:41,546] INFO Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181 (org.apache.zookeeper.ClientCnxn) [2014-04-11 15:17:41,551] INFO Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2014-04-11 15:17:41,562] INFO Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x14552dde3620001, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2014-04-11 15:17:41,565] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2014-04-11 15:17:41,647] INFO Found clean shutdown file. Skipping recovery for all logs in data directory '/tmp/kafka-logs' (kafka.log.LogManager) [2014-04-11 15:17:41,648] INFO Starting log cleanup with a period of 60000 ms. (kafka.log.LogManager) [2014-04-11 15:17:41,652] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [2014-04-11 15:17:41,694] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor) [2014-04-11 15:17:41,695] INFO [Socket Server on Broker 0] , Started (kafka.network.SocketServer) [2014-04-11 15:17:41,793] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) [2014-04-11 15:17:41,840] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [2014-04-11 15:17:41,955] INFO Registered broker 0 at path /brokers/ids/0 with address guwang-ld2.linkedin.biz:9092. (kafka.utils.ZkUtils$) [2014-04-11 15:17:41,983] INFO [Kafka Server 0] , started (kafka.server.KafkaServer) [2014-04-11 15:17:42,039] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) Controller: [2014-04-11 15:17:24,520] DEBUG preRegister called. Server=com.sun.jmx.mbeanserver.JmxMBeanServer@7aab853b, name=log4j:logger=kafka.controller (kafka.controller) [2014-04-11 15:17:30,898] INFO [Controller 0] : Controller starting up (kafka.controller.KafkaController) [2014-04-11 15:17:30,932] INFO [Controller 0] : Broker 0 starting become controller state transition (kafka.controller.KafkaController) [2014-04-11 15:17:30,946] INFO [Controller 0] : Controller 0 incremented epoch to 1 (kafka.controller.KafkaController) [2014-04-11 15:17:30,982] INFO [Controller 0] : Partitions undergoing preferred replica election: (kafka.controller.KafkaController) [2014-04-11 15:17:30,983] INFO [Controller 0] : Partitions that completed preferred replica election: (kafka.controller.KafkaController) [2014-04-11 15:17:30,983] INFO [Controller 0] : Resuming preferred replica election for partitions: (kafka.controller.KafkaController) [2014-04-11 15:17:30,988] INFO [Controller 0] : Partitions being reassigned: Map() (kafka.controller.KafkaController) [2014-04-11 15:17:30,988] INFO [Controller 0] : Partitions already reassigned: List() (kafka.controller.KafkaController) [2014-04-11 15:17:30,989] INFO [Controller 0] : Resuming reassignment of partitions: Map() (kafka.controller.KafkaController) [2014-04-11 15:17:30,992] INFO [Controller 0] : List of topics to be deleted: (kafka.controller.KafkaController) [2014-04-11 15:17:30,992] INFO [Controller 0] : List of topics ineligible for deletion: (kafka.controller.KafkaController) [2014-04-11 15:17:30,996] INFO [Controller 0] : Currently active brokers in the cluster: Set() (kafka.controller.KafkaController) [2014-04-11 15:17:30,997] INFO [Controller 0] : Currently shutting brokers in the cluster: Set() (kafka.controller.KafkaController) [2014-04-11 15:17:30,997] INFO [Controller 0] : Current list of topics in the cluster: Set() (kafka.controller.KafkaController) [2014-04-11 15:17:31,000] INFO [Replica state machine on controller 0] : Started replica state machine with initial state -> Map() (kafka.controller.ReplicaStateMachine) [2014-04-11 15:17:31,005] INFO [Partition state machine on Controller 0] : Started partition state machine with initial state -> Map() (kafka.controller.PartitionStateMachine) [2014-04-11 15:17:31,006] INFO [Controller 0] : Broker 0 is ready to serve as the new controller with epoch 1 (kafka.controller.KafkaController) [2014-04-11 15:17:31,007] INFO [Controller 0] : Starting preferred replica leader election for partitions (kafka.controller.KafkaController) [2014-04-11 15:17:31,008] INFO [Partition state machine on Controller 0] : Invoking state change to OnlinePartition for partitions (kafka.controller.PartitionStateMachine) [2014-04-11 15:17:31,019] INFO [Controller 0] : Controller startup complete (kafka.controller.KafkaController) [2014-04-11 15:17:31,133] DEBUG [ControllerEpochListener on 0] : Controller epoch listener fired with new epoch 1 (kafka.controller.ControllerEpochListener) [2014-04-11 15:17:31,144] INFO [ControllerEpochListener on 0] : Initialized controller epoch to 1 and zk version 0 (kafka.controller.ControllerEpochListener) [2014-04-11 15:17:31,148] INFO [BrokerChangeListener on Controller 0] : Broker change listener fired for path /brokers/ids with children 0 (kafka.controller.ReplicaStateMachine$BrokerChangeListener) [2014-04-11 15:17:31,203] INFO [BrokerChangeListener on Controller 0] : Newly added brokers: 0, deleted brokers: , all live brokers: 0 (kafka.controller.ReplicaStateMachine$BrokerChangeListener) [2014-04-11 15:17:31,205] DEBUG [Channel manager on controller 0] : Controller 0 trying to connect to broker 0 (kafka.controller.ControllerChannelManager) [2014-04-11 15:17:31,210] INFO [Controller-0-to-broker-0-send-thread] , Controller 0 connected to id:0,host:guwang-ld2.linkedin.biz,port:9092 for sending state change requests (kafka.controller.RequestSendThread) [2014-04-11 15:17:31,212] INFO [Controller-0-to-broker-0-send-thread] , Starting (kafka.controller.RequestSendThread) [2014-04-11 15:17:31,212] INFO [Controller 0] : New broker startup callback for 0 (kafka.controller.KafkaController) [2014-04-11 15:17:36,049] INFO [BrokerChangeListener on Controller 0] : Broker change listener fired for path /brokers/ids with children (kafka.controller.ReplicaStateMachine$BrokerChangeListener) [2014-04-11 15:17:36,050] INFO [BrokerChangeListener on Controller 0] : Newly added brokers: , deleted brokers: 0, all live brokers: (kafka.controller.ReplicaStateMachine$BrokerChangeListener) [2014-04-11 15:17:36,059] INFO [Controller-0-to-broker-0-send-thread] , Shutting down (kafka.controller.RequestSendThread) [2014-04-11 15:17:36,060] INFO [Controller-0-to-broker-0-send-thread] , Shutdown completed (kafka.controller.RequestSendThread) [2014-04-11 15:17:36,061] INFO [Controller-0-to-broker-0-send-thread] , Stopped (kafka.controller.RequestSendThread) [2014-04-11 15:17:36,061] INFO [Controller 0] : Broker failure callback for 0 (kafka.controller.KafkaController) [2014-04-11 15:17:36,064] INFO [Controller 0] : Removed List() from list of shutting down brokers. (kafka.controller.KafkaController) [2014-04-11 15:17:36,067] INFO [Partition state machine on Controller 0] : Invoking state change to OfflinePartition for partitions (kafka.controller.PartitionStateMachine) [2014-04-11 15:17:41,755] INFO [ControllerEpochListener on 0] : Initialized controller epoch to 1 and zk version 0 (kafka.controller.ControllerEpochListener) [2014-04-11 15:17:41,795] INFO [Controller 0] : Controller starting up (kafka.controller.KafkaController) [2014-04-11 15:17:41,841] INFO [Controller 0] : Broker 0 starting become controller state transition (kafka.controller.KafkaController) [2014-04-11 15:17:41,852] INFO [Controller 0] : Controller 0 incremented epoch to 2 (kafka.controller.KafkaController) [2014-04-11 15:17:41,884] INFO [Controller 0] : Partitions undergoing preferred replica election: (kafka.controller.KafkaController) [2014-04-11 15:17:41,885] INFO [Controller 0] : Partitions that completed preferred replica election: (kafka.controller.KafkaController) [2014-04-11 15:17:41,885] INFO [Controller 0] : Resuming preferred replica election for partitions: (kafka.controller.KafkaController) [2014-04-11 15:17:41,889] INFO [Controller 0] : Partitions being reassigned: Map() (kafka.controller.KafkaController) [2014-04-11 15:17:41,890] INFO [Controller 0] : Partitions already reassigned: List() (kafka.controller.KafkaController) [2014-04-11 15:17:41,891] INFO [Controller 0] : Resuming reassignment of partitions: Map() (kafka.controller.KafkaController) [2014-04-11 15:17:41,894] INFO [Controller 0] : List of topics to be deleted: (kafka.controller.KafkaController) [2014-04-11 15:17:41,894] INFO [Controller 0] : List of topics ineligible for deletion: (kafka.controller.KafkaController) [2014-04-11 15:17:41,898] INFO [Controller 0] : Currently active brokers in the cluster: Set() (kafka.controller.KafkaController) [2014-04-11 15:17:41,898] INFO [Controller 0] : Currently shutting brokers in the cluster: Set() (kafka.controller.KafkaController) [2014-04-11 15:17:41,899] INFO [Controller 0] : Current list of topics in the cluster: Set() (kafka.controller.KafkaController) [2014-04-11 15:17:41,902] INFO [Replica state machine on controller 0] : Started replica state machine with initial state -> Map() (kafka.controller.ReplicaStateMachine) [2014-04-11 15:17:41,906] INFO [Partition state machine on Controller 0] : Started partition state machine with initial state -> Map() (kafka.controller.PartitionStateMachine) [2014-04-11 15:17:41,908] INFO [Controller 0] : Broker 0 is ready to serve as the new controller with epoch 2 (kafka.controller.KafkaController) [2014-04-11 15:17:41,909] INFO [Controller 0] : Starting preferred replica leader election for partitions (kafka.controller.KafkaController) [2014-04-11 15:17:41,910] INFO [Partition state machine on Controller 0] : Invoking state change to OnlinePartition for partitions (kafka.controller.PartitionStateMachine) [2014-04-11 15:17:41,920] INFO [Controller 0] : Controller startup complete (kafka.controller.KafkaController) [2014-04-11 15:17:42,042] DEBUG [ControllerEpochListener on 0] : Controller epoch listener fired with new epoch 2 (kafka.controller.ControllerEpochListener) [2014-04-11 15:17:42,045] INFO [ControllerEpochListener on 0] : Initialized controller epoch to 2 and zk version 1 (kafka.controller.ControllerEpochListener) [2014-04-11 15:17:42,049] INFO [BrokerChangeListener on Controller 0] : Broker change listener fired for path /brokers/ids with children 0 (kafka.controller.ReplicaStateMachine$BrokerChangeListener) [2014-04-11 15:17:42,103] INFO [BrokerChangeListener on Controller 0] : Newly added brokers: 0, deleted brokers: , all live brokers: 0 (kafka.controller.ReplicaStateMachine$BrokerChangeListener) [2014-04-11 15:17:42,104] DEBUG [Channel manager on controller 0] : Controller 0 trying to connect to broker 0 (kafka.controller.ControllerChannelManager) [2014-04-11 15:17:42,109] INFO [Controller-0-to-broker-0-send-thread] , Controller 0 connected to id:0,host:guwang-ld2.linkedin.biz,port:9092 for sending state change requests (kafka.controller.RequestSendThread) [2014-04-11 15:17:42,111] INFO [Controller-0-to-broker-0-send-thread] , Starting (kafka.controller.RequestSendThread) [2014-04-11 15:17:42,111] INFO [Controller 0] : New broker startup callback for 0 (kafka.controller.KafkaController) 2. Topic creation Server: [2014-04-11 15:19:46,428] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager) [2014-04-11 15:19:46,455] INFO Completed load of log test-0 with log end offset 0 (kafka.log.Log) [2014-04-11 15:19:46,460] INFO Created log for partition [test,0] in /tmp/kafka-logs with properties Unknown macro: {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, unclean.leader.election.enable -> true, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000} . (kafka.log.LogManager) [2014-04-11 15:19:46,461] WARN Partition [test,0] on broker 0: No checkpointed highwatermark is found for partition [test,0] (kafka.cluster.Partition) State-change: [2014-04-11 15:17:24,517] DEBUG preRegister called. Server=com.sun.jmx.mbeanserver.JmxMBeanServer@7aab853b, name=log4j:logger=state.change.logger (state.change.logger) [2014-04-11 15:19:46,327] TRACE Controller 0 epoch 2 changed partition [test,0] state from NonExistentPartition to NewPartition with assigned replicas 0 (state.change.logger) [2014-04-11 15:19:46,333] TRACE Controller 0 epoch 2 changed state of replica 0 for partition [test,0] from NonExistentReplica to NewReplica (state.change.logger) [2014-04-11 15:19:46,365] TRACE Controller 0 epoch 2 changed partition [test,0] from NewPartition to OnlinePartition with leader 0 (state.change.logger) [2014-04-11 15:19:46,374] TRACE Controller 0 epoch 2 sending become-leader LeaderAndIsr request (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) with correlationId 7 to broker 0 for partition [test,0] (state.change.logger) [2014-04-11 15:19:46,377] TRACE Controller 0 epoch 2 sending UpdateMetadata request (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) with correlationId 7 to broker 0 for partition [test,0] (state.change.logger) [2014-04-11 15:19:46,380] TRACE Controller 0 epoch 2 changed state of replica 0 for partition [test,0] from NewReplica to OnlineReplica (state.change.logger) [2014-04-11 15:19:46,408] TRACE Broker 0 received LeaderAndIsr request (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0) correlation id 7 from controller 0 epoch 2 for partition [test,0] (state.change.logger) [2014-04-11 15:19:46,418] TRACE Broker 0 handling LeaderAndIsr request correlationId 7 from controller 0 epoch 2 starting the become-leader transition for partition [test,0] (state.change.logger) [2014-04-11 15:19:46,429] TRACE Broker 0 stopped fetchers as part of become-leader request from controller 0 epoch 2 with correlation id 7 for partition [test,0] (state.change.logger) [2014-04-11 15:19:46,473] TRACE Broker 0 completed LeaderAndIsr request correlationId 7 from controller 0 epoch 2 for the become-leader transition for partition [test,0] (state.change.logger) [2014-04-11 15:19:46,486] TRACE Controller 0 epoch 2 received response correlationId 7 for a request sent to broker id:0,host:guwang-ld2.linkedin.biz,port:9092 (state.change.logger) [2014-04-11 15:19:46,509] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0) for partition [test,0] in response to UpdateMetadata request sent by controller 0 epoch 2 with correlation id 7 (state.change.logger) [2014-04-11 15:19:46,511] TRACE Controller 0 epoch 2 received response correlationId 7 for a request sent to broker id:0,host:guwang-ld2.linkedin.biz,port:9092 (state.change.logger) Controller: [2014-04-11 15:19:46,294] DEBUG [TopicChangeListener on Controller 0] : Topic change listener fired for path /brokers/topics with children test (kafka.controller.PartitionStateMachine$TopicChangeListener) [2014-04-11 15:19:46,312] INFO [TopicChangeListener on Controller 0] : New topics: [Set(test)] , deleted topics: [Set()] , new partition replica assignment [Map( [test,0] -> List(0))] (kafka.controller.PartitionStateMachine$TopicChangeListener) [2014-04-11 15:19:46,313] INFO [Controller 0] : New topic creation callback for [test,0] (kafka.controller.KafkaController) [2014-04-11 15:19:46,316] INFO [Controller 0] : New partition creation callback for [test,0] (kafka.controller.KafkaController) [2014-04-11 15:19:46,317] INFO [Partition state machine on Controller 0] : Invoking state change to NewPartition for partitions [test,0] (kafka.controller.PartitionStateMachine) [2014-04-11 15:19:46,330] INFO [Replica state machine on controller 0] : Invoking state change to NewReplica for replicas [Topic=test,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-04-11 15:19:46,333] INFO [Partition state machine on Controller 0] : Invoking state change to OnlinePartition for partitions [test,0] (kafka.controller.PartitionStateMachine) [2014-04-11 15:19:46,335] DEBUG [Partition state machine on Controller 0] : Live assigned replicas for partition [test,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-04-11 15:19:46,337] DEBUG [Partition state machine on Controller 0] : Initializing leader and isr for partition [test,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-04-11 15:19:46,377] INFO [Replica state machine on controller 0] : Invoking state change to OnlineReplica for replicas [Topic=test,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) 3. Send messages: nearly no logs except one in kafka-request. 4. Receive messages: nearly no logs in kafka-request and [2014-04-11 16:59:12,262] ERROR Closing socket for /127.0.0.1 because of error java.io.IOException: Connection reset by peer (kafka.network.Processor) When consumer closed. 5. Server shutdown: Shutting-down Server: [2014-04-11 17:04:14,041] INFO [Kafka Server 0] , shutting down (kafka.server.KafkaServer) [2014-04-11 17:04:14,054] INFO Deregistered broker 0 at path /brokers/ids/0. (kafka.utils.ZkUtils$) [2014-04-11 17:04:14,056] INFO [Socket Server on Broker 0] , Shutting down (kafka.network.SocketServer) [2014-04-11 17:04:14,061] INFO [Socket Server on Broker 0] , Shutdown completed (kafka.network.SocketServer) [2014-04-11 17:04:14,062] INFO [Kafka Request Handler on Broker 0] , shutting down (kafka.server.KafkaRequestHandlerPool) [2014-04-11 17:04:14,066] INFO [Kafka Request Handler on Broker 0] , shut down completely (kafka.server.KafkaRequestHandlerPool) [2014-04-11 17:04:14,256] INFO [Replica Manager on Broker 0] : Shut down (kafka.server.ReplicaManager) [2014-04-11 17:04:14,256] INFO [ReplicaFetcherManager on broker 0] shutting down (kafka.server.ReplicaFetcherManager) [2014-04-11 17:04:14,258] INFO [ReplicaFetcherManager on broker 0] shutdown completed (kafka.server.ReplicaFetcherManager) [2014-04-11 17:04:14,258] INFO [Replica Manager on Broker 0] : Shut down completely (kafka.server.ReplicaManager) [2014-04-11 17:04:14,259] INFO Shutting down. (kafka.log.LogManager) [2014-04-11 17:04:14,278] INFO Shutdown complete. (kafka.log.LogManager) [2014-04-11 17:04:14,280] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2014-04-11 17:04:14,289] INFO Session: 0x14552dde3620001 closed (org.apache.zookeeper.ZooKeeper) [2014-04-11 17:04:14,289] INFO EventThread shut down (org.apache.zookeeper.ClientCnxn) [2014-04-11 17:04:14,290] INFO [Kafka Server 0] , shut down completed (kafka.server.KafkaServer) Remaining Server: [2014-04-11 17:04:14,280] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) [2014-04-11 17:04:14,301] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [2014-04-11 17:04:14,469] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) Controller: [2014-04-11 17:04:14,055] INFO [BrokerChangeListener on Controller 0] : Broker change listener fired for path /brokers/ids with children 1 (kafka.controller.ReplicaStateMachine$BrokerChangeListener) [2014-04-11 17:04:14,079] INFO [BrokerChangeListener on Controller 0] : Newly added brokers: , deleted brokers: 0, all live brokers: 1 (kafka.controller.ReplicaStateMachine$BrokerChangeListener) [2014-04-11 17:04:14,081] INFO [Controller-0-to-broker-0-send-thread] , Shutting down (kafka.controller.RequestSendThread) [2014-04-11 17:04:14,082] INFO [Controller-0-to-broker-0-send-thread] , Stopped (kafka.controller.RequestSendThread) [2014-04-11 17:04:14,082] INFO [Controller-0-to-broker-0-send-thread] , Shutdown completed (kafka.controller.RequestSendThread) [2014-04-11 17:04:14,083] INFO [Controller 0] : Broker failure callback for 0 (kafka.controller.KafkaController) [2014-04-11 17:04:14,083] INFO [Controller 0] : Removed List() from list of shutting down brokers. (kafka.controller.KafkaController) [2014-04-11 17:04:14,084] INFO [Partition state machine on Controller 0] : Invoking state change to OfflinePartition for partitions [test,0] (kafka.controller.PartitionStateMachine) [2014-04-11 17:04:14,116] DEBUG [OfflinePartitionLeaderSelector] : No broker in ISR is alive for [test,0] . Pick the leader from the alive assigned replicas: (kafka.controller.OfflinePartitionLeaderSelector) [2014-04-11 17:04:14,120] INFO [Replica state machine on controller 0] : Invoking state change to OfflineReplica for replicas [Topic=test,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-04-11 17:04:14,124] DEBUG [Controller 0] : Removing replica 0 from ISR 0 for partition [test,0] . (kafka.controller.KafkaController) [2014-04-11 17:04:14,146] INFO [Controller 0] : New leader and ISR for partition [test,0] is Unknown macro: {"leader"} (kafka.controller.KafkaController) [2014-04-11 17:04:14,152] DEBUG The stop replica request (delete = true) sent to broker 0 is (kafka.controller.ControllerBrokerRequestBatch) [2014-04-11 17:04:14,153] DEBUG The stop replica request (delete = false) sent to broker 0 is [Topic=test,Partition=0,Replica=0] (kafka.controller.ControllerBrokerRequestBatch) [2014-04-11 17:04:14,158] WARN [Channel manager on controller 0] : Not sending request Name: StopReplicaRequest; Version: 0; CorrelationId: 13; ClientId: ; DeletePartitions: false; ControllerId: 0; ControllerEpoch: 2; Partitions: [test,0] to broker 0, since it is offline. (kafka.controller.ControllerChannelManager) [2014-04-11 17:04:14,279] INFO [Controller-0-to-broker-1-send-thread] , Shutting down (kafka.controller.RequestSendThread) [2014-04-11 17:04:14,280] INFO [Controller-0-to-broker-1-send-thread] , Stopped (kafka.controller.RequestSendThread) [2014-04-11 17:04:14,280] INFO [Controller-0-to-broker-1-send-thread] , Shutdown completed (kafka.controller.RequestSendThread) 5. Controlled server shutdown: Shutting down server: [2014-04-11 17:14:37,204] INFO [Kafka Server 0] , shutting down (kafka.server.KafkaServer) [2014-04-11 17:14:37,206] INFO [Kafka Server 0] , Starting controlled shutdown (kafka.server.KafkaServer) [2014-04-11 17:14:37,259] INFO [Kafka Server 0] , Controlled shutdown succeeded (kafka.server.KafkaServer) [2014-04-11 17:14:37,259] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) [2014-04-11 17:14:37,270] INFO Deregistered broker 0 at path /brokers/ids/0. (kafka.utils.ZkUtils$) [2014-04-11 17:14:37,271] INFO [Socket Server on Broker 0] , Shutting down (kafka.network.SocketServer) [2014-04-11 17:14:37,277] INFO [Socket Server on Broker 0] , Shutdown completed (kafka.network.SocketServer) [2014-04-11 17:14:37,279] INFO [Kafka Request Handler on Broker 0] , shutting down (kafka.server.KafkaRequestHandlerPool) [2014-04-11 17:14:37,281] INFO [Kafka Request Handler on Broker 0] , shut down completely (kafka.server.KafkaRequestHandlerPool) [2014-04-11 17:14:37,679] INFO [Replica Manager on Broker 0] : Shut down (kafka.server.ReplicaManager) [2014-04-11 17:14:37,680] INFO [ReplicaFetcherManager on broker 0] shutting down (kafka.server.ReplicaFetcherManager) [2014-04-11 17:14:37,681] INFO [ReplicaFetcherManager on broker 0] shutdown completed (kafka.server.ReplicaFetcherManager) [2014-04-11 17:14:37,683] INFO [Replica Manager on Broker 0] : Shut down completely (kafka.server.ReplicaManager) [2014-04-11 17:14:37,684] INFO Shutting down. (kafka.log.LogManager) [2014-04-11 17:14:37,692] INFO Shutdown complete. (kafka.log.LogManager) [2014-04-11 17:14:37,696] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2014-04-11 17:14:37,704] INFO Session: 0x14552dde3620007 closed (org.apache.zookeeper.ZooKeeper) [2014-04-11 17:14:37,704] INFO EventThread shut down (org.apache.zookeeper.ClientCnxn) [2014-04-11 17:14:37,704] INFO [Kafka Server 0] , shut down completed (kafka.server.KafkaServer) Controller failover: [2014-04-11 17:14:37,231] INFO [Controller 0] : Shutting down broker 0 (kafka.controller.KafkaController) [2014-04-11 17:14:37,232] DEBUG [Controller 0] : All shutting down brokers: 0 (kafka.controller.KafkaController) [2014-04-11 17:14:37,232] DEBUG [Controller 0] : Live brokers: 1 (kafka.controller.KafkaController) [2014-04-11 17:14:37,236] INFO [Partition state machine on Controller 0] : Invoking state change to OnlinePartition for partitions [test,0] (kafka.controller.PartitionStateMachine) [2014-04-11 17:14:37,255] TRACE [Controller 0] : All leaders = [test,0] -> (Leader:0,ISR:0,LeaderEpoch:2,ControllerEpoch:3) (kafka.controller.KafkaController) [2014-04-11 17:14:37,271] INFO [BrokerChangeListener on Controller 0] : Broker change listener fired for path /brokers/ids with children 1 (kafka.controller.ReplicaStateMachine$BrokerChangeListener) [2014-04-11 17:14:37,285] INFO [BrokerChangeListener on Controller 0] : Newly added brokers: , deleted brokers: 0, all live brokers: 1 (kafka.controller.ReplicaStateMachine$BrokerChangeListener) [2014-04-11 17:14:37,286] INFO [Controller-0-to-broker-0-send-thread] , Shutting down (kafka.controller.RequestSendThread) [2014-04-11 17:14:37,287] INFO [Controller-0-to-broker-0-send-thread] , Stopped (kafka.controller.RequestSendThread) [2014-04-11 17:14:37,287] INFO [Controller-0-to-broker-0-send-thread] , Shutdown completed (kafka.controller.RequestSendThread) [2014-04-11 17:14:37,288] INFO [Controller 0] : Broker failure callback for 0 (kafka.controller.KafkaController) [2014-04-11 17:14:37,289] INFO [Controller 0] : Removed List(0) from list of shutting down brokers. (kafka.controller.KafkaController) [2014-04-11 17:14:37,290] INFO [Partition state machine on Controller 0] : Invoking state change to OfflinePartition for partitions [test,0] (kafka.controller.PartitionStateMachine) [2014-04-11 17:14:37,305] DEBUG [OfflinePartitionLeaderSelector] : No broker in ISR is alive for [test,0] . Pick the leader from the alive assigned replicas: (kafka.controller.OfflinePartitionLeaderSelector) [2014-04-11 17:14:37,308] INFO [Replica state machine on controller 0] : Invoking state change to OfflineReplica for replicas [Topic=test,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-04-11 17:14:37,312] DEBUG [Controller 0] : Removing replica 0 from ISR 0 for partition [test,0] . (kafka.controller.KafkaController) [2014-04-11 17:14:37,325] INFO [Controller 0] : New leader and ISR for partition [test,0] is Unknown macro: {"leader"} (kafka.controller.KafkaController) [2014-04-11 17:14:37,329] DEBUG The stop replica request (delete = true) sent to broker 0 is (kafka.controller.ControllerBrokerRequestBatch) [2014-04-11 17:14:37,329] DEBUG The stop replica request (delete = false) sent to broker 0 is [Topic=test,Partition=0,Replica=0] (kafka.controller.ControllerBrokerRequestBatch) [2014-04-11 17:14:37,333] WARN [Channel manager on controller 0] : Not sending request Name: StopReplicaRequest; Version: 0; CorrelationId: 9; ClientId: ; DeletePartitions: false; ControllerId: 0; ControllerEpoch: 4; Partitions: [test,0] to broker 0, since it is offline. (kafka.controller.ControllerChannelManager) [2014-04-11 17:14:37,695] INFO [Controller-0-to-broker-1-send-thread] , Shutting down (kafka.controller.RequestSendThread) [2014-04-11 17:14:37,695] INFO [Controller-0-to-broker-1-send-thread] , Stopped (kafka.controller.RequestSendThread) [2014-04-11 17:14:37,695] INFO [Controller-0-to-broker-1-send-thread] , Shutdown completed (kafka.controller.RequestSendThread) [2014-04-11 17:14:37,713] INFO [Controller 1] : Broker 1 starting become controller state transition (kafka.controller.KafkaController) [2014-04-11 17:14:37,721] INFO [Controller 1] : Controller 1 incremented epoch to 5 (kafka.controller.KafkaController) [2014-04-11 17:14:37,787] DEBUG [Channel manager on controller 1] : Controller 1 trying to connect to broker 1 (kafka.controller.ControllerChannelManager) [2014-04-11 17:14:37,792] INFO [Controller-1-to-broker-1-send-thread] , Controller 1 connected to id:1,host:guwang-ld2.linkedin.biz,port:1999 for sending state change requests (kafka.controller.RequestSendThread) [2014-04-11 17:14:37,794] INFO [Controller-1-to-broker-1-send-thread] , Starting (kafka.controller.RequestSendThread) [2014-04-11 17:14:37,796] INFO [Controller 1] : Partitions undergoing preferred replica election: (kafka.controller.KafkaController) [2014-04-11 17:14:37,797] INFO [Controller 1] : Partitions that completed preferred replica election: (kafka.controller.KafkaController) [2014-04-11 17:14:37,797] INFO [Controller 1] : Resuming preferred replica election for partitions: (kafka.controller.KafkaController) [2014-04-11 17:14:37,800] INFO [Controller 1] : Partitions being reassigned: Map() (kafka.controller.KafkaController) [2014-04-11 17:14:37,800] INFO [Controller 1] : Partitions already reassigned: List() (kafka.controller.KafkaController) [2014-04-11 17:14:37,801] INFO [Controller 1] : Resuming reassignment of partitions: Map() (kafka.controller.KafkaController) [2014-04-11 17:14:37,806] INFO [Controller 1] : List of topics to be deleted: (kafka.controller.KafkaController) [2014-04-11 17:14:37,806] INFO [Controller 1] : List of topics ineligible for deletion: test (kafka.controller.KafkaController) [2014-04-11 17:14:37,809] INFO [Controller 1] : Currently active brokers in the cluster: Set(1) (kafka.controller.KafkaController) [2014-04-11 17:14:37,809] INFO [Controller 1] : Currently shutting brokers in the cluster: Set() (kafka.controller.KafkaController) [2014-04-11 17:14:37,810] INFO [Controller 1] : Current list of topics in the cluster: Set(test) (kafka.controller.KafkaController) [2014-04-11 17:14:37,815] INFO [Replica state machine on controller 1] : Started replica state machine with initial state -> Map( [Topic=test,Partition=0,Replica=0] -> ReplicaDeletionIneligible) (kafka.controller.ReplicaStateMachine) [2014-04-11 17:14:37,839] DEBUG [OfflinePartitionLeaderSelector] : No broker in ISR is alive for [test,0] . Pick the leader from the alive assigned replicas: (kafka.controller.OfflinePartitionLeaderSelector) [2014-04-11 17:14:37,845] INFO [Partition state machine on Controller 1] : Started partition state machine with initial state -> Map( [test,0] -> OfflinePartition) (kafka.controller.PartitionStateMachine) [2014-04-11 17:14:37,848] INFO [Controller 1] : Broker 1 is ready to serve as the new controller with epoch 5 (kafka.controller.KafkaController) [2014-04-11 17:14:37,849] INFO [Controller 1] : Starting preferred replica leader election for partitions (kafka.controller.KafkaController) [2014-04-11 17:14:37,850] INFO [Partition state machine on Controller 1] : Invoking state change to OnlinePartition for partitions (kafka.controller.PartitionStateMachine) [2014-04-11 17:14:37,901] DEBUG [ControllerEpochListener on 1] : Controller epoch listener fired with new epoch 5 (kafka.controller.ControllerEpochListener) [2014-04-11 17:14:37,903] INFO [ControllerEpochListener on 1] : Initialized controller epoch to 5 and zk version 4 (kafka.controller.ControllerEpochListener)
          Hide
          Guozhang Wang added a comment -

          We have this describe() function that have the details flag as the new implementation of RequestOrResponse.toString, but the problem is that we only set it to false when we print the state-change-log in debug mode.

          So here are my proposal for the logging convention additional to http://kafka.apache.org/coding-guide.html:

          1. Always keep log entries single-lined except:

          a. RequestOrResponse.toString(details = true) for produce/fetch requests.
          b. Lists of topic/partitions to consume on startup/rebalance in ZookeeperConsumerConnector.
          c. .. (this list could grow, but we need to keep track)

          ----------------

          I am also proposing the following exception handling conventions (copied from the RB):

          1. No stack trace below WARN

          2. For WARN,

          a) Print stack trace when calling library functions (i.e. non-kafka functions) or we captured Throwable which could be various exception types.

          b) Print e.toSting (e.Class.Name + ":" + e.Message) when we captured Throwable but we could be sure about possible exception types.

          c) Only print e.Message otherwise.

          3. For ERROR, always print stack trace.

          Also there are some mis-use of swallow, which should only be used when "we do not throw more exceptions but just log in with stack trace in whole", but not "when we really do not care if it throw any exceptions".

          1. Avoid capture Throwable as less as possible. Only capture Throwable when 1) calling a library function whose throwable exceptions are not defined clearly, or 2) if we REALLY want to "swallow" any exceptions thrown. In the second case, we should log not any exceptions thrown.

          2. For our own scala classes, specify possible checked kafka exceptions in the comments as often as possible (for java classes do that in signatures). When catching exceptions, case-enumerate all possible types and log Exception.getMessage.

          3. When log with WARN or below, do not include the stack trace; when log with ERROR, only log the stack trace if we are catching Throwable or any runtime exceptions.

          4. Only re-throw the same/another exception type after logging when the function needs a return value and the thrown exception cause it to not able to return any. In this case, throw a specific exception and capture it the caller without further double-logging.

          Show
          Guozhang Wang added a comment - We have this describe() function that have the details flag as the new implementation of RequestOrResponse.toString, but the problem is that we only set it to false when we print the state-change-log in debug mode. So here are my proposal for the logging convention additional to http://kafka.apache.org/coding-guide.html: 1. Always keep log entries single-lined except: a. RequestOrResponse.toString(details = true) for produce/fetch requests. b. Lists of topic/partitions to consume on startup/rebalance in ZookeeperConsumerConnector. c. .. (this list could grow, but we need to keep track) ---------------- I am also proposing the following exception handling conventions (copied from the RB): 1. No stack trace below WARN 2. For WARN, a) Print stack trace when calling library functions (i.e. non-kafka functions) or we captured Throwable which could be various exception types. b) Print e.toSting (e.Class.Name + ":" + e.Message) when we captured Throwable but we could be sure about possible exception types. c) Only print e.Message otherwise. 3. For ERROR, always print stack trace. Also there are some mis-use of swallow, which should only be used when "we do not throw more exceptions but just log in with stack trace in whole", but not "when we really do not care if it throw any exceptions". 1. Avoid capture Throwable as less as possible. Only capture Throwable when 1) calling a library function whose throwable exceptions are not defined clearly, or 2) if we REALLY want to "swallow" any exceptions thrown. In the second case, we should log not any exceptions thrown. 2. For our own scala classes, specify possible checked kafka exceptions in the comments as often as possible (for java classes do that in signatures). When catching exceptions, case-enumerate all possible types and log Exception.getMessage. 3. When log with WARN or below, do not include the stack trace; when log with ERROR, only log the stack trace if we are catching Throwable or any runtime exceptions. 4. Only re-throw the same/another exception type after logging when the function needs a return value and the thrown exception cause it to not able to return any. In this case, throw a specific exception and capture it the caller without further double-logging.
          Hide
          Jun Rao added a comment -

          In terms of logging, it seems that
          1. If we don't know from where the exception is going to be thrown, we can log the stacktrace.
          2. Otherwise, if we don't know the type of the exception, we can log e.toString
          3. Otherwise, we can log e.getMessage

          Agreed that the catching should be as specific as possible. In all cases when we catch throwable, we just want to log our bugs (e.g., NullPointerException). However, catching Exception may be enough. We can probably fix this is a separate jira.

          Show
          Jun Rao added a comment - In terms of logging, it seems that 1. If we don't know from where the exception is going to be thrown, we can log the stacktrace. 2. Otherwise, if we don't know the type of the exception, we can log e.toString 3. Otherwise, we can log e.getMessage Agreed that the catching should be as specific as possible. In all cases when we catch throwable, we just want to log our bugs (e.g., NullPointerException). However, catching Exception may be enough. We can probably fix this is a separate jira.
          Hide
          Guozhang Wang added a comment -

          Another observation from Jon Bringhurst is that sometime we output Exception.getMessage as null:

          2014/05/06 15:52:10.046 WARN... Possible cause: null
          2014/05/06 15:52:10.047 INFO ... Reconnect due to socket error: null
          

          This can also be improved.

          Show
          Guozhang Wang added a comment - Another observation from Jon Bringhurst is that sometime we output Exception.getMessage as null: 2014/05/06 15:52:10.046 WARN... Possible cause: null 2014/05/06 15:52:10.047 INFO ... Reconnect due to socket error: null This can also be improved.
          Hide
          Simon Cooper added a comment -

          Another example is when there's an exception in the ReplicaFetcherThread:

          [2014-05-14 12:33:37,811] WARN [ReplicaFetcherThread-0-2], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 36409; ClientId: ReplicaFetcherThread-0-2; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: <queues and PartitionFetchInfos>
          java.net.ConnectException: Connection refused
                  at sun.nio.ch.Net.connect0(Native Method)
                  at sun.nio.ch.Net.connect(Net.java:465)
                  at sun.nio.ch.Net.connect(Net.java:457)
                  at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
                  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
                  at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
                  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
                  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
                  at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
                  at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
                  at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
                  at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
                  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
                  at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
                  at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
                  at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
                  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
                  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
                  at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
                  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
                  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
          [2014-05-14 12:33:37,811] INFO Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)
          

          In this case, this error is printed out to the console every millisecond, causing the daemon log file to become very large very quickly.

          Show
          Simon Cooper added a comment - Another example is when there's an exception in the ReplicaFetcherThread: [2014-05-14 12:33:37,811] WARN [ReplicaFetcherThread-0-2], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 36409; ClientId: ReplicaFetcherThread-0-2; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: <queues and PartitionFetchInfos> java.net.ConnectException: Connection refused at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:465) at sun.nio.ch.Net.connect(Net.java:457) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [2014-05-14 12:33:37,811] INFO Reconnect due to socket error: null (kafka.consumer.SimpleConsumer) In this case, this error is printed out to the console every millisecond , causing the daemon log file to become very large very quickly.
          Hide
          Guozhang Wang added a comment -

          We will be fixing this issue all together with https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Error+Handling+and+Logging in 0.9.

          Show
          Guozhang Wang added a comment - We will be fixing this issue all together with https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Error+Handling+and+Logging in 0.9.

            People

            • Assignee:
              Ivan Lyutov
              Reporter:
              Neha Narkhede
            • Votes:
              1 Vote for this issue
              Watchers:
              10 Start watching this issue

              Dates

              • Created:
                Updated:

                Development