Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-410

after been killed, one broker fail to re-join the cluster after restarted

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Cannot Reproduce
    • None
    • None
    • core
    • None
    • the version of kafka we use is trunk 2a59ad76c6

    Description

      In my kafka cluster ,there are 2 brokers,

      One of them was killed by accident yesterday, after that ,I tried to restart the broker,but seems that it failed to join the cluster.

      In zookeeper,it successfully register itself,but after sending logs, no data show up in kafka data directory

      This is the log of the broker after restart it:

      [2012-07-21 17:34:27,807] DEBUG preRegister called. Server=com.sun.jmx.mbeanserver.JmxMBeanServer@6443226, name=kafka:type=kafka.KafkaLog4j (root)
      [2012-07-21 17:34:27,808] DEBUG Adding AppenderMBean for appender named fileAppender (org.apache.log4j.jmx.LoggerDynamicMBean)
      [2012-07-21 17:34:27,810] DEBUG getMBeanInfo called. (org.apache.log4j.jmx.AppenderDynamicMBean)
      [2012-07-21 17:34:27,810] DEBUG preRegister called. Server=com.sun.jmx.mbeanserver.JmxMBeanServer@6443226, name=log4j:appender=fileAppender (org.apache.log4j.jmx.AppenderDynamicMBean)
      [2012-07-21 17:34:27,810] DEBUG Adding LayoutMBean:fileAppender,layout=org.apache.log4j.PatternLayout (org.apache.log4j.jmx.AppenderDynamicMBean)
      [2012-07-21 17:34:27,811] DEBUG getMBeanInfo called. (org.apache.log4j.jmx.LayoutDynamicMBean)
      [2012-07-21 17:34:27,811] DEBUG preRegister called. Server=com.sun.jmx.mbeanserver.JmxMBeanServer@6443226, name=log4j:appender=fileAppender,layout=org.apache.log4j.PatternLayout (org.apache.log4j.jmx.LayoutDynamicMBean)
      [2012-07-21 17:34:27,967] INFO The number of partitions for topic no_appkey : 1 (kafka.utils.Utils$)
      [2012-07-21 17:34:27,969] INFO The number of partitions for topic parse_exception : 1 (kafka.utils.Utils$)
      [2012-07-21 17:34:27,972] INFO Starting Kafka server... (kafka.server.KafkaServer)
      [2012-07-21 17:34:27,984] INFO starting log cleaner every 60000 ms (kafka.log.LogManager)
      [2012-07-21 17:34:27,990] INFO connecting to ZK: XX.XX.XX.229:2181/kafka (kafka.server.KafkaZooKeeper)
      [2012-07-21 17:34:27,999] DEBUG Creating new ZookKeeper instance to connect to XX.XX.XX.229:2181/kafka. (org.I0Itec.zkclient.ZkConnection)
      [2012-07-21 17:34:27,999] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
      [2012-07-21 17:34:28,006] INFO Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 17:34:28,006] INFO Client environment:host.name=mobile-1 (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 17:34:28,006] INFO Client environment:java.version=1.6.0_17 (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 17:34:28,006] INFO Client environment:java.vendor=Sun Microsystems Inc. (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 17:34:28,006] INFO Client environment:java.home=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/jre (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 17:34:28,006] INFO Client environment:java.class.path=:/home/Our_Server/kafka/libs/jopt-simple-3.2.jar:/home/Our_Server/kafka/libs/log4j-1.2.15.jar:/home/Our_Server/kafka/libs/scala-compiler.jar:/home/Our_Server/kafka/libs/scala-library.jar:/home/Our_Server/kafka/libs/snappy-java-1.0.4.1.jar:/home/Our_Server/kafka/libs/zkclient-0.1.jar:/home/Our_Server/kafka/libs/zookeeper-3.3.4.jar:/home/Our_Server/kafka/kafka-trunk-2a59ad76c6_scala-2.8.0.jar (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 17:34:28,006] INFO Client environment:java.library.path=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/jre/lib/amd64/server:/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/jre/lib/amd64:/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/jre/../lib/amd64:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 17:34:28,006] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 17:34:28,006] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 17:34:28,006] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 17:34:28,006] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 17:34:28,006] INFO Client environment:os.version=2.6.32-71.29.1.el6.x86_64 (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 17:34:28,006] INFO Client environment:user.name=Our_Server (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 17:34:28,006] INFO Client environment:user.home=/home/Our_Server (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 17:34:28,006] INFO Client environment:user.dir=/home/Our_Server/kafka (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 17:34:28,007] INFO Initiating client connection, connectString=XX.XX.XX.229:2181/kafka sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@3c6210fb (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 17:34:28,009] DEBUG zookeeper.disableAutoWatchReset is false (org.apache.zookeeper.ClientCnxn)
      [2012-07-21 17:34:28,024] INFO Opening socket connection to server /XX.XX.XX.229:2181 (org.apache.zookeeper.ClientCnxn)
      [2012-07-21 17:34:28,028] INFO Socket connection established to mobile-1/XX.XX.XX.229:2181, initiating session (org.apache.zookeeper.ClientCnxn)
      [2012-07-21 17:34:28,030] DEBUG Session establishment request sent on mobile-1/XX.XX.XX.229:2181 (org.apache.zookeeper.ClientCnxn)
      [2012-07-21 17:34:28,048] INFO Session establishment complete on server mobile-1/XX.XX.XX.229:2181, sessionid = 0x1389e95409a0b65, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
      [2012-07-21 17:34:28,049] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
      [2012-07-21 17:34:28,147] INFO Awaiting connections on port 9092 (kafka.network.Acceptor)
      [2012-07-21 17:34:28,149] DEBUG Will try to load MX4j now, if it's in the classpath (kafka.utils.Mx4jLoader$)
      [2012-07-21 17:34:28,149] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
      [2012-07-21 17:34:28,150] INFO Registering broker /brokers/ids/1 (kafka.server.KafkaZooKeeper)
      [2012-07-21 17:34:28,173] DEBUG Reading reply sessionid:0x1389e95409a0b65, packet:: clientPath:null serverPath:null finished:false header:: 1,1 replyHeader:: 1,8312179,0 request:: '/kafka/brokers/ids/1,#3132302e3139372e38342e3232392d313334323836333236383135303a3132302e3139372e38342e3232393a39303932,v{s{31,s

      {'world,'anyone}

      }},1 response:: '/kafka/brokers/ids/1 (org.apache.zookeeper.ClientCnxn)
      [2012-07-21 17:34:28,174] INFO Registering broker /brokers/ids/1 succeeded with id:1,creatorId:XX.XX.XX.229-1342863268150,host:XX.XX.XX.229,port:9092 (kafka.server.KafkaZooKeeper)
      [2012-07-21 17:34:28,188] INFO Starting log flusher every 1000 ms with the following overrides Map() (kafka.log.LogManager)
      [2012-07-21 17:34:28,189] INFO Kafka server started. (kafka.server.KafkaServer)
      [2012-07-21 17:34:29,189] DEBUG flushing the high watermark of all logs (kafka.log.LogManager)
      [2012-07-21 17:34:30,158] DEBUG Got ping response for sessionid: 0x1389e95409a0b65 after 0ms (org.apache.zookeeper.ClientCnxn)
      [2012-07-21 17:34:30,188] DEBUG flushing the high watermark of all logs (kafka.log.LogManager)
      [2012-07-21 17:34:31,188] DEBUG flushing the high watermark of all logs (kafka.log.LogManager)
      [2012-07-21 17:34:32,158] DEBUG Got ping response for sessionid: 0x1389e95409a0b65 after 0ms (org.apache.zookeeper.ClientCnxn)
      [2012-07-21 17:34:32,188] DEBUG flushing the high watermark of all logs (kafka.log.LogManager)
      [2012-07-21 17:34:33,188] DEBUG flushing the high watermark of all logs (kafka.log.LogManager)
      [2012-07-21 17:34:34,158] DEBUG Got ping response for sessionid: 0x1389e95409a0b65 after 0ms (org.apache.zookeeper.ClientCnxn)
      [2012-07-21 17:34:34,188] DEBUG flushing the high watermark of all logs (kafka.log.LogManager)
      [2012-07-21 17:34:35,188] DEBUG flushing the high watermark of all logs (kafka.log.LogManager)
      [2012-07-21 17:34:36,157] DEBUG Got ping response for sessionid: 0x1389e95409a0b65 after 0ms (org.apache.zookeeper.ClientCnxn)
      [2012-07-21 17:34:36,188] DEBUG flushing the high watermark of all logs (kafka.log.LogManager)
      [2012-07-21 17:34:37,188] DEBUG flushing the high watermark of all logs (kafka.log.LogManager)
      [2012-07-21 17:34:38,168] DEBUG Got ping response for sessionid: 0x1389e95409a0b65 after 10ms (org.apache.zookeeper.ClientCnxn)
      [2012-07-21 17:34:38,188] DEBUG flushing the high watermark of all logs (kafka.log.LogManager)
      [2012-07-21 17:34:39,188] DEBUG flushing the high watermark of all logs (kafka.log.LogManager)
      [2012-07-21 17:34:40,157] DEBUG Got ping response for sessionid: 0x1389e95409a0b65 after 0ms (org.apache.zookeeper.ClientCnxn)
      ...
      ...
      ...

      This is the error from producer before send any data to brokers:

      [2012-07-21 20:10:33,217] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
      [2012-07-21 20:10:33,222] INFO Client environment:zookeeper.version=3.3.3-1073969, built on 02/23/2011 22:27 GMT (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 20:10:33,222] INFO Client environment:host.name=mobile-2 (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 20:10:33,222] INFO Client environment:java.version=1.7.0_02 (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 20:10:33,222] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 20:10:33,222] INFO Client environment:java.home=/opt/jdk1.7.0/jre (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 20:10:33,222] INFO Client environment:java.class.path=/home/our_server_name/apps/our_service/current/our_service_2.9.1-0.2.2.jar (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 20:10:33,222] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 20:10:33,222] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 20:10:33,222] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 20:10:33,222] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 20:10:33,222] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 20:10:33,223] INFO Client environment:os.version=2.6.32-71.el6.x86_64 (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 20:10:33,223] INFO Client environment:user.name=our_server_name (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 20:10:33,223] INFO Client environment:user.home=/home/our_server_name (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 20:10:33,223] INFO Client environment:user.dir=/home/our_server_name/apps/our_service/releases/20120716050825 (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 20:10:33,223] INFO Initiating client connection, connectString=XX.XX.XX.229:2181/kafka sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@9875096 (org.apache.zookeeper.ZooKeeper)
      [2012-07-21 20:10:33,236] INFO Opening socket connection to server /XX.XX.XX.229:2181 (org.apache.zookeeper.ClientCnxn)
      [2012-07-21 20:10:33,239] INFO Socket connection established to mobile-1/XX.XX.XX.229:2181, initiating session (org.apache.zookeeper.ClientCnxn)
      [2012-07-21 20:10:33,253] INFO Session establishment complete on server mobile-1/XX.XX.XX.229:2181, sessionid = 0x1389e95409a0c02, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
      [2012-07-21 20:10:33,255] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
      [2012-07-21 20:10:33,275] DEBUG Broker ids and # of partitions on each for topic: nrt-test = ArrayBuffer((0,32)) (kafka.producer.ZKBrokerPartitionInfo)
      [2012-07-21 20:10:33,293] DEBUG Sorted list of broker ids and partition ids on each for topic: nrt-test = TreeSet(0-0, 0-1, 0-2, 0-3, 0-4, 0-5, 0-6, 0-7, 0-8, 0-9, 0-10, 0-11, 0-12, 0-13, 0-14, 0-15, 0-16, 0-17, 0-18, 0-19, 0-20, 0-21, 0-22, 0-23, 0-24, 0-25, 0-26, 0-27, 0-28, 0-29, 0-30, 0-31) (kafka.producer.ZKBrokerPartitionInfo)
      [2012-07-21 20:10:33,295] DEBUG Broker ids and # of partitions on each for topic: app_log = ArrayBuffer((0,32)) (kafka.producer.ZKBrokerPartitionInfo)
      [2012-07-21 20:10:33,297] DEBUG Sorted list of broker ids and partition ids on each for topic: app_log = TreeSet(0-0, 0-1, 0-2, 0-3, 0-4, 0-5, 0-6, 0-7, 0-8, 0-9, 0-10, 0-11, 0-12, 0-13, 0-14, 0-15, 0-16, 0-17, 0-18, 0-19, 0-20, 0-21, 0-22, 0-23, 0-24, 0-25, 0-26, 0-27, 0-28, 0-29, 0-30, 0-31) (kafka.producer.ZKBrokerPartitionInfo)
      [2012-07-21 20:10:33,299] DEBUG Broker ids and # of partitions on each for topic: install-test = ArrayBuffer((0,32)) (kafka.producer.ZKBrokerPartitionInfo)
      [2012-07-21 20:10:33,300] DEBUG Sorted list of broker ids and partition ids on each for topic: install-test = TreeSet(0-0, 0-1, 0-2, 0-3, 0-4, 0-5, 0-6, 0-7, 0-8, 0-9, 0-10, 0-11, 0-12, 0-13, 0-14, 0-15, 0-16, 0-17, 0-18, 0-19, 0-20, 0-21, 0-22, 0-23, 0-24, 0-25, 0-26, 0-27, 0-28, 0-29, 0-30, 0-31) (kafka.producer.ZKBrokerPartitionInfo)
      [2012-07-21 20:10:33,302] DEBUG Broker ids and # of partitions on each for topic: nrt-test-less = ArrayBuffer((0,32)) (kafka.producer.ZKBrokerPartitionInfo)
      [2012-07-21 20:10:33,304] DEBUG Sorted list of broker ids and partition ids on each for topic: nrt-test-less = TreeSet(0-0, 0-1, 0-2, 0-3, 0-4, 0-5, 0-6, 0-7, 0-8, 0-9, 0-10, 0-11, 0-12, 0-13, 0-14, 0-15, 0-16, 0-17, 0-18, 0-19, 0-20, 0-21, 0-22, 0-23, 0-24, 0-25, 0-26, 0-27, 0-28, 0-29, 0-30, 0-31) (kafka.producer.ZKBrokerPartitionInfo)
      [2012-07-21 20:10:33,306] DEBUG Broker ids and # of partitions on each for topic: no_appkey = ArrayBuffer((0,1)) (kafka.producer.ZKBrokerPartitionInfo)
      [2012-07-21 20:10:33,306] DEBUG Sorted list of broker ids and partition ids on each for topic: no_appkey = TreeSet(0-0) (kafka.producer.ZKBrokerPartitionInfo)
      [2012-07-21 20:10:33,308] DEBUG Broker ids and # of partitions on each for topic: parse_exception = ArrayBuffer((0,32)) (kafka.producer.ZKBrokerPartitionInfo)
      [2012-07-21 20:10:33,309] DEBUG Sorted list of broker ids and partition ids on each for topic: parse_exception = TreeSet(0-0, 0-1, 0-2, 0-3, 0-4, 0-5, 0-6, 0-7, 0-8, 0-9, 0-10, 0-11, 0-12, 0-13, 0-14, 0-15, 0-16, 0-17, 0-18, 0-19, 0-20, 0-21, 0-22, 0-23, 0-24, 0-25, 0-26, 0-27, 0-28, 0-29, 0-30, 0-31) (kafka.producer.ZKBrokerPartitionInfo)
      [2012-07-21 20:10:33,315] DEBUG [BrokerTopicsListener] Creating broker topics listener to watch the following paths -
      /broker/topics, /broker/topics/topic, /broker/ids (kafka.producer.ZKBrokerPartitionInfo$BrokerTopicsListener)
      [2012-07-21 20:10:33,318] DEBUG [BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to partition id per topic with Map(app_log -> TreeSet(0-0, 0-1, 0-2, 0-3, 0-4, 0-5, 0-6, 0-7, 0-8, 0-9, 0-10, 0-11, 0-12, 0-13, 0-14, 0-15, 0-16, 0-17, 0-18, 0-19, 0-20, 0-21, 0-22, 0-23, 0-24, 0-25, 0-26, 0-27, 0-28, 0-29, 0-30, 0-31), nrt-test-less -> TreeSet(0-0, 0-1, 0-2, 0-3, 0-4, 0-5, 0-6, 0-7, 0-8, 0-9, 0-10, 0-11, 0-12, 0-13, 0-14, 0-15, 0-16, 0-17, 0-18, 0-19, 0-20, 0-21, 0-22, 0-23, 0-24, 0-25, 0-26, 0-27, 0-28, 0-29, 0-30, 0-31), nrt-test -> TreeSet(0-0, 0-1, 0-2, 0-3, 0-4, 0-5, 0-6, 0-7, 0-8, 0-9, 0-10, 0-11, 0-12, 0-13, 0-14, 0-15, 0-16, 0-17, 0-18, 0-19, 0-20, 0-21, 0-22, 0-23, 0-24, 0-25, 0-26, 0-27, 0-28, 0-29, 0-30, 0-31), install-test -> TreeSet(0-0, 0-1, 0-2, 0-3, 0-4, 0-5, 0-6, 0-7, 0-8, 0-9, 0-10, 0-11, 0-12, 0-13, 0-14, 0-15, 0-16, 0-17, 0-18, 0-19, 0-20, 0-21, 0-22, 0-23, 0-24, 0-25, 0-26, 0-27, 0-28, 0-29, 0-30, 0-31), no_appkey -> TreeSet(0-0), parse_exception -> TreeSet(0-0, 0-1, 0-2, 0-3, 0-4, 0-5, 0-6, 0-7, 0-8, 0-9, 0-10, 0-11, 0-12, 0-13, 0-14, 0-15, 0-16, 0-17, 0-18, 0-19, 0-20, 0-21, 0-22, 0-23, 0-24, 0-25, 0-26, 0-27, 0-28, 0-29, 0-30, 0-31)) (kafka.producer.ZKBrokerPartitionInfo$BrokerTopicsListener)
      [2012-07-21 20:10:33,328] DEBUG Registering listener on path: /brokers/topics/app_log (kafka.producer.ZKBrokerPartitionInfo)
      [2012-07-21 20:10:33,329] DEBUG Registering listener on path: /brokers/topics/nrt-test-less (kafka.producer.ZKBrokerPartitionInfo)
      [2012-07-21 20:10:33,331] DEBUG Registering listener on path: /brokers/topics/nrt-test (kafka.producer.ZKBrokerPartitionInfo)
      [2012-07-21 20:10:33,332] DEBUG Registering listener on path: /brokers/topics/no_appkey (kafka.producer.ZKBrokerPartitionInfo)
      [2012-07-21 20:10:33,333] DEBUG Registering listener on path: /brokers/topics/install-test (kafka.producer.ZKBrokerPartitionInfo)
      [2012-07-21 20:10:33,334] DEBUG Registering listener on path: /brokers/topics/parse_exception (kafka.producer.ZKBrokerPartitionInfo)
      [2012-07-21 20:10:33,342] TRACE Instantiating Scala Sync Producer (kafka.producer.SyncProducer)
      [2012-07-21 20:10:33,357] INFO Creating async producer for broker id = 1 at XX.XX.XX.229:9092 (kafka.producer.ProducerPool)
      [2012-07-21 20:10:33,357] TRACE Instantiating Scala Sync Producer (kafka.producer.SyncProducer)
      [2012-07-21 20:10:33,359] INFO Creating async producer for broker id = 0 at XX.XX.XX.233:9092 (kafka.producer.ProducerPool)
      [2012-07-21 20:10:38,366] DEBUG 5007 ms elapsed. Queue time reached. Sending.. (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:10:38,366] DEBUG 5008 ms elapsed. Queue time reached. Sending.. (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:10:38,367] DEBUG Handling 0 events (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:10:38,367] DEBUG Handling 0 events (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:10:43,367] DEBUG 5000 ms elapsed. Queue time reached. Sending.. (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:10:43,367] DEBUG 5000 ms elapsed. Queue time reached. Sending.. (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:10:43,367] DEBUG Handling 0 events (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:10:43,368] DEBUG Handling 0 events (kafka.producer.async.ProducerSendThread)
      ...
      ...
      ...

      And producer log after sending some data:

      [2012-07-21 20:12:43,382] DEBUG Handling 0 events (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:12:48,382] DEBUG 5000 ms elapsed. Queue time reached. Sending.. (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:12:48,382] DEBUG Handling 0 events (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:12:48,382] DEBUG 5000 ms elapsed. Queue time reached. Sending.. (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:12:48,383] DEBUG Handling 0 events (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:12:48,672] DEBUG Getting the number of broker partitions registered for topic: app_log (kafka.producer.Producer)
      [2012-07-21 20:12:48,675] DEBUG Broker partitions registered for topic: app_log = ArrayBuffer(0-0, 0-1, 0-2, 0-3, 0-4, 0-5, 0-6, 0-7, 0-8, 0-9, 0-10, 0-11, 0-12, 0-13, 0-14, 0-15, 0-16, 0-17, 0-18, 0-19, 0-20, 0-21, 0-22, 0-23, 0-24, 0-25, 0-26, 0-27, 0-28, 0-29, 0-30, 0-31) (kafka.producer.Producer)
      [2012-07-21 20:12:48,675] DEBUG Sending message to broker XX.XX.XX.233:9092 on partition 26 (kafka.producer.Producer)
      [2012-07-21 20:12:48,679] DEBUG Fetching async producer for broker id: 0 (kafka.producer.ProducerPool)
      [2012-07-21 20:12:48,683] TRACE Added event to send queue for topic: app_log, partition: 26:

      {"os":"Android","access_subtype":"EDGE",...,"tag":"Sync"}

      (kafka.producer.async.AsyncProducer)
      [2012-07-21 20:12:48,683] TRACE Remaining queue size: 10000 (kafka.producer.async.AsyncProducer)
      [2012-07-21 20:12:48,683] DEBUG Sending compressed messages (kafka.producer.ProducerPool)
      [2012-07-21 20:12:48,683] TRACE Dequeued item for topic app_log and partition 26 (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:12:53,383] DEBUG 5000 ms elapsed. Queue time reached. Sending.. (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:12:53,383] DEBUG Handling 0 events (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:12:53,383] DEBUG 5000 ms elapsed. Queue time reached. Sending.. (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:12:53,383] DEBUG Handling 1 events (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:12:53,384] TRACE Handling event for Topic: app_log, Partition: 26 (kafka.producer.async.DefaultEventHandler)
      [2012-07-21 20:12:53,391] TRACE Sending 1 messages with compression codec 1 to topic app_log on partition 26 (kafka.producer.async.DefaultEventHandler)
      [2012-07-21 20:12:53,399] DEBUG Allocating message byte buffer of size = 696 (kafka.message.CompressionUtils$)
      [2012-07-21 20:12:53,407] TRACE Remaining bytes in iterator = 476 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,407] TRACE size of data = 476 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,408] TRACE shallow iterator currValidBytes = 480 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,409] TRACE Got multi message sets with 480 bytes to send (kafka.producer.SyncProducer)
      [2012-07-21 20:12:53,412] TRACE verifying sendbuffer of size 501 (kafka.producer.SyncProducer)
      [2012-07-21 20:12:53,414] DEBUG makeNext() in internalIterator: innerDone = true (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,414] TRACE Remaining bytes in iterator = 476 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,414] TRACE size of data = 476 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,414] DEBUG Message is compressed. Valid byte count = 0 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,421] DEBUG makeNext() in internalIterator: innerDone = true (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,421] TRACE Remaining bytes in iterator = 692 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,421] TRACE size of data = 692 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,422] DEBUG Message is uncompressed. Valid byte count = 0 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,422] TRACE currValidBytes = 696 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,422] DEBUG makeNext() in internalIterator: innerDone = false (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,422] DEBUG makeNext() in internalIterator: innerDone = true (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,423] DEBUG makeNext() in internalIterator: innerDone = true (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,424] INFO Connected to XX.XX.XX.233:9092 for producing (kafka.producer.SyncProducer)
      [2012-07-21 20:12:53,425] TRACE 505 bytes written. (kafka.network.BoundedByteBufferSend)
      [2012-07-21 20:12:53,430] DEBUG makeNext() in internalIterator: innerDone = true (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,430] TRACE Remaining bytes in iterator = 476 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,431] TRACE size of data = 476 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,431] DEBUG Message is compressed. Valid byte count = 0 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,431] DEBUG makeNext() in internalIterator: innerDone = true (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,431] TRACE Remaining bytes in iterator = 692 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,431] TRACE size of data = 692 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,431] DEBUG Message is uncompressed. Valid byte count = 0 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,432] TRACE currValidBytes = 696 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,432] DEBUG makeNext() in internalIterator: innerDone = false (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,433] DEBUG makeNext() in internalIterator: innerDone = true (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,435] DEBUG makeNext() in internalIterator: innerDone = true (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:12:53,435] TRACE kafka producer sent messages for topics Map((app_log,26) -> ByteBufferMessageSet(MessageAndOffset(message(magic = 1, attributes = 0, crc = 296392111, payload = java.nio.HeapByteBuffer[pos=0 lim=686 cap=686]),480), )) to broker XX.XX.XX.233:9092 (on attempt 1) (kafka.producer.async.DefaultEventHandler)
      [2012-07-21 20:12:58,383] DEBUG 5000 ms elapsed. Queue time reached. Sending.. (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:12:58,383] DEBUG Handling 0 events (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:12:58,435] DEBUG 5000 ms elapsed. Queue time reached. Sending.. (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:12:58,436] DEBUG Handling 0 events (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:12:59,427] DEBUG Getting the number of broker partitions registered for topic: app_log (kafka.producer.Producer)
      [2012-07-21 20:12:59,428] DEBUG Broker partitions registered for topic: app_log = ArrayBuffer(0-0, 0-1, 0-2, 0-3, 0-4, 0-5, 0-6, 0-7, 0-8, 0-9, 0-10, 0-11, 0-12, 0-13, 0-14, 0-15, 0-16, 0-17, 0-18, 0-19, 0-20, 0-21, 0-22, 0-23, 0-24, 0-25, 0-26, 0-27, 0-28, 0-29, 0-30, 0-31) (kafka.producer.Producer)
      [2012-07-21 20:12:59,428] DEBUG Sending message to broker XX.XX.XX.233:9092 on partition 26 (kafka.producer.Producer)
      [2012-07-21 20:12:59,428] DEBUG Fetching async producer for broker id: 0 (kafka.producer.ProducerPool)
      [2012-07-21 20:12:59,429] TRACE Dequeued item for topic app_log and partition 26 (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:12:59,429] TRACE Added event to send queue for topic: app_log, partition: 26:

      {"os":"Android","access_subtype":"EDGE",...,"city":""}

      (kafka.producer.async.AsyncProducer)
      [2012-07-21 20:12:59,429] TRACE Remaining queue size: 10000 (kafka.producer.async.AsyncProducer)
      [2012-07-21 20:12:59,429] DEBUG Sending compressed messages (kafka.producer.ProducerPool)
      [2012-07-21 20:13:03,384] DEBUG 5001 ms elapsed. Queue time reached. Sending.. (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:13:03,384] DEBUG Handling 0 events (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:13:03,436] DEBUG 5000 ms elapsed. Queue time reached. Sending.. (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:13:03,436] DEBUG Handling 1 events (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:13:03,436] TRACE Handling event for Topic: app_log, Partition: 26 (kafka.producer.async.DefaultEventHandler)
      [2012-07-21 20:13:03,437] TRACE Sending 1 messages with compression codec 1 to topic app_log on partition 26 (kafka.producer.async.DefaultEventHandler)
      [2012-07-21 20:13:03,437] DEBUG Allocating message byte buffer of size = 725 (kafka.message.CompressionUtils$)
      [2012-07-21 20:13:03,437] TRACE Remaining bytes in iterator = 468 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,437] TRACE size of data = 468 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,437] TRACE shallow iterator currValidBytes = 472 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,438] TRACE Got multi message sets with 472 bytes to send (kafka.producer.SyncProducer)
      [2012-07-21 20:13:03,438] TRACE verifying sendbuffer of size 493 (kafka.producer.SyncProducer)
      [2012-07-21 20:13:03,438] DEBUG makeNext() in internalIterator: innerDone = true (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,438] TRACE Remaining bytes in iterator = 468 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,438] TRACE size of data = 468 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,438] DEBUG Message is compressed. Valid byte count = 0 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,438] DEBUG makeNext() in internalIterator: innerDone = true (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,439] TRACE Remaining bytes in iterator = 721 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,439] TRACE size of data = 721 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,439] DEBUG Message is uncompressed. Valid byte count = 0 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,439] TRACE currValidBytes = 725 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,439] DEBUG makeNext() in internalIterator: innerDone = false (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,439] DEBUG makeNext() in internalIterator: innerDone = true (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,439] DEBUG makeNext() in internalIterator: innerDone = true (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,439] TRACE 497 bytes written. (kafka.network.BoundedByteBufferSend)
      [2012-07-21 20:13:03,440] DEBUG makeNext() in internalIterator: innerDone = true (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,440] TRACE Remaining bytes in iterator = 468 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,440] TRACE size of data = 468 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,440] DEBUG Message is compressed. Valid byte count = 0 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,440] DEBUG makeNext() in internalIterator: innerDone = true (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,440] TRACE Remaining bytes in iterator = 721 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,441] TRACE size of data = 721 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,441] DEBUG Message is uncompressed. Valid byte count = 0 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,441] TRACE currValidBytes = 725 (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,441] DEBUG makeNext() in internalIterator: innerDone = false (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,441] DEBUG makeNext() in internalIterator: innerDone = true (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,441] DEBUG makeNext() in internalIterator: innerDone = true (kafka.message.ByteBufferMessageSet)
      [2012-07-21 20:13:03,441] TRACE kafka producer sent messages for topics Map((app_log,26) -> ByteBufferMessageSet(MessageAndOffset(message(magic = 1, attributes = 0, crc = 1163603178, payload = java.nio.HeapByteBuffer[pos=0 lim=715 cap=715]),472), )) to broker XX.XX.XX.233:9092 (on attempt 1) (kafka.producer.async.DefaultEventHandler)
      [2012-07-21 20:13:08,385] DEBUG 5000 ms elapsed. Queue time reached. Sending.. (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:13:08,385] DEBUG Handling 0 events (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:13:08,442] DEBUG 5001 ms elapsed. Queue time reached. Sending.. (kafka.producer.async.ProducerSendThread)
      [2012-07-21 20:13:08,442] DEBUG Handling 0 events (kafka.producer.async.ProducerSendThread)

      Attachments

        Activity

          People

            Unassigned Unassigned
            mmliu mmliu
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: