Index: core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala =================================================================== --- core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (revision 1200390) +++ core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (working copy) @@ -24,7 +24,7 @@ import kafka.consumer.SimpleConsumer import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.TestUtils -import kafka.utils.Utils +import kafka.utils.{Utils, LogHelper} import junit.framework.Assert._ import kafka.api.FetchRequest import kafka.serializer.Encoder @@ -33,7 +33,7 @@ import org.scalatest.junit.JUnitSuite import org.junit.{After, Before, Test} -class KafkaLog4jAppenderTest extends JUnitSuite { +class KafkaLog4jAppenderTest extends JUnitSuite with LogHelper { var logDir: File = null // var topicLogDir: File = null @@ -130,10 +130,9 @@ @Test def testLog4jAppends() { PropertyConfigurator.configure(getLog4jConfig) - val logger = Logger.getLogger(classOf[KafkaLog4jAppenderTest]) for(i <- 1 to 5) - logger.info("test") + info("test") Thread.sleep(500) Index: core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala (revision 1200390) +++ core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala (working copy) @@ -21,14 +21,14 @@ import kafka.zk.ZooKeeperTestHarness import java.nio.channels.ClosedByInterruptException import java.util.concurrent.atomic.AtomicInteger -import kafka.utils.ZKGroupTopicDirs +import kafka.utils.{ZKGroupTopicDirs, LogHelper} import kafka.consumer.{ConsumerTimeoutException, ConsumerConfig, ConsumerConnector, Consumer} import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig} import org.apache.log4j.{Level, Logger} import org.scalatest.junit.JUnit3Suite import kafka.utils.{TestUtils, TestZKUtils} -class AutoOffsetResetTest extends JUnit3Suite with ZooKeeperTestHarness { +class AutoOffsetResetTest extends JUnit3Suite with ZooKeeperTestHarness with LogHelper { val zkConnect = TestZKUtils.zookeeperConnect val topic = "test_topic" @@ -41,7 +41,6 @@ val largeOffset = 10000 val smallOffset = -1 - private val logger = Logger.getLogger(getClass()) val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers]) override def setUp() { @@ -74,7 +73,7 @@ val consumerConfig = new ConsumerConfig(consumerProps) TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0-0", largeOffset) - logger.info("Updated consumer offset to " + largeOffset) + info("Updated consumer offset to " + largeOffset) Thread.sleep(500) val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig) @@ -93,7 +92,7 @@ } } catch { - case te: ConsumerTimeoutException => logger.info("Consumer thread timing out..") + case te: ConsumerTimeoutException => info("Consumer thread timing out..") case _: InterruptedException => case _: ClosedByInterruptException => case e => throw e @@ -108,7 +107,7 @@ threadList(0).join(2000) - logger.info("Asserting...") + info("Asserting...") assertEquals(numMessages, nMessages.get) consumerConnector.shutdown } @@ -128,7 +127,7 @@ val consumerConfig = new ConsumerConfig(consumerProps) TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0-0", smallOffset) - logger.info("Updated consumer offset to " + smallOffset) + info("Updated consumer offset to " + smallOffset) val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig) @@ -161,7 +160,7 @@ threadList(0).join(2000) - logger.info("Asserting...") + info("Asserting...") assertEquals(numMessages, nMessages.get) consumerConnector.shutdown } @@ -181,7 +180,7 @@ val consumerConfig = new ConsumerConfig(consumerProps) TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0-0", largeOffset) - logger.info("Updated consumer offset to " + largeOffset) + info("Updated consumer offset to " + largeOffset) val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig) @@ -214,7 +213,7 @@ threadList(0).join(2000) - logger.info("Asserting...") + info("Asserting...") assertEquals(0, nMessages.get) consumerConnector.shutdown Index: core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala =================================================================== --- core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (revision 1200390) +++ core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (working copy) @@ -22,15 +22,14 @@ import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig import scala.collection._ -import kafka.utils.Utils +import kafka.utils.{Utils, LogHelper} import kafka.utils.{TestZKUtils, TestUtils} import org.scalatest.junit.JUnit3Suite import org.apache.log4j.{Level, Logger} import kafka.message._ import kafka.serializer.StringDecoder -class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness { - private val logger = Logger.getLogger(getClass()) +class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with LogHelper { val zookeeperConnect = TestZKUtils.zookeeperConnect val zkConnect = zookeeperConnect @@ -117,7 +116,7 @@ zkConsumerConnector1.shutdown zkConsumerConnector2.shutdown zkConsumerConnector3.shutdown - logger.info("all consumer connectors stopped") + info("all consumer connectors stopped") requestHandlerLogger.setLevel(Level.ERROR) } @@ -170,7 +169,7 @@ zkConsumerConnector1.shutdown zkConsumerConnector2.shutdown zkConsumerConnector3.shutdown - logger.info("all consumer connectors stopped") + info("all consumer connectors stopped") requestHandlerLogger.setLevel(Level.ERROR) } @@ -232,7 +231,7 @@ assertTrue(iterator.hasNext()) val message = iterator.next() receivedMessages ::= message - logger.debug("received message: " + message) + debug("received message: " + message) } } } @@ -275,7 +274,7 @@ assertTrue(iterator.hasNext) val message = iterator.next messages ::= message - logger.debug("received message: " + Utils.toString(message.payload, "UTF-8")) + debug("received message: " + Utils.toString(message.payload, "UTF-8")) } } } Index: core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala =================================================================== --- core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (revision 1200390) +++ core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (working copy) @@ -22,7 +22,7 @@ import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig import scala.collection._ -import kafka.utils.Utils +import kafka.utils.{Utils, LogHelper} import kafka.utils.{TestZKUtils, TestUtils} import org.scalatest.junit.JUnit3Suite import scala.collection.JavaConversions._ @@ -33,8 +33,7 @@ import kafka.message.{NoCompressionCodec, DefaultCompressionCodec, CompressionCodec, Message} import kafka.serializer.StringDecoder -class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness { - private val logger = Logger.getLogger(getClass()) +class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with LogHelper { val zookeeperConnect = TestZKUtils.zookeeperConnect val zkConnect = zookeeperConnect @@ -119,7 +118,7 @@ zkConsumerConnector1.shutdown zkConsumerConnector2.shutdown zkConsumerConnector3.shutdown - logger.info("all consumer connectors stopped") + info("all consumer connectors stopped") requestHandlerLogger.setLevel(Level.ERROR) } @@ -169,7 +168,7 @@ zkConsumerConnector1.shutdown zkConsumerConnector2.shutdown zkConsumerConnector3.shutdown - logger.info("all consumer connectors stopped") + info("all consumer connectors stopped") requestHandlerLogger.setLevel(Level.ERROR) } @@ -230,7 +229,7 @@ assertTrue(iterator.hasNext()) val message = iterator.next() receivedMessages ::= message - logger.debug("received message: " + message) + debug("received message: " + message) } } } @@ -276,7 +275,7 @@ assertTrue(iterator.hasNext) val message = iterator.next messages ::= message - logger.debug("received message: " + Utils.toString(message.payload, "UTF-8")) + debug("received message: " + Utils.toString(message.payload, "UTF-8")) } } } Index: core/src/test/scala/other/kafka/TestKafkaAppender.scala =================================================================== --- core/src/test/scala/other/kafka/TestKafkaAppender.scala (revision 1200390) +++ core/src/test/scala/other/kafka/TestKafkaAppender.scala (working copy) @@ -19,11 +19,10 @@ import message.Message import org.apache.log4j.{Logger, PropertyConfigurator} +import kafka.utils.LogHelper import serializer.Encoder -object TestKafkaAppender { - - private val logger = Logger.getLogger(TestKafkaAppender.getClass) +object TestKafkaAppender extends LogHelper { def main(args:Array[String]) { @@ -41,7 +40,7 @@ } for(i <- 1 to 10) - logger.info("test") + info("test") } } Index: core/src/main/scala/kafka/Kafka.scala =================================================================== --- core/src/main/scala/kafka/Kafka.scala (revision 1200390) +++ core/src/main/scala/kafka/Kafka.scala (working copy) @@ -18,17 +18,16 @@ package kafka import consumer.ConsumerConfig -import org.apache.log4j.Logger import producer.ProducerConfig import server.{KafkaConfig, KafkaServerStartable, KafkaServer} -import utils.Utils +import utils.{Utils, LogHelper} import org.apache.log4j.jmx.LoggerDynamicMBean -object Kafka { - private val logger = Logger.getLogger(Kafka.getClass) +object Kafka extends LogHelper { def main(args: Array[String]): Unit = { val kafkaLog4jMBeanName = "kafka:type=kafka.KafkaLog4j" + import org.apache.log4j.Logger Utils.swallow(logger.warn, Utils.registerMBean(new LoggerDynamicMBean(Logger.getRootLogger()), kafkaLog4jMBeanName)) if (!List(1, 3).contains(args.length)) { @@ -61,7 +60,7 @@ kafkaServerStartble.awaitShutdown } catch { - case e => logger.fatal(e) + case e => fatal(e) } System.exit(0) } Index: core/src/main/scala/kafka/tools/ConsumerPerformance.scala =================================================================== --- core/src/main/scala/kafka/tools/ConsumerPerformance.scala (revision 1200390) +++ core/src/main/scala/kafka/tools/ConsumerPerformance.scala (working copy) @@ -22,8 +22,7 @@ import java.util.concurrent.atomic.AtomicLong import java.nio.channels.ClosedByInterruptException import joptsimple._ -import org.apache.log4j.Logger -import kafka.utils.Utils +import kafka.utils.{Utils, LogHelper} import kafka.consumer.{ConsumerConfig, ConsumerConnector, Consumer} abstract class ShutdownableThread(name: String) extends Thread(name) { @@ -33,8 +32,7 @@ /** * Performance test for the full zookeeper consumer */ -object ConsumerPerformance { - private val logger = Logger.getLogger(getClass()) +object ConsumerPerformance extends LogHelper { def main(args: Array[String]): Unit = { @@ -127,7 +125,7 @@ } private def printMessage(totalBytesRead: Long, nMessages: Long, elapsedSecs: Double) = { - logger.info("thread[" + i + "], nMsgs:" + nMessages + " bytes:" + totalBytesRead + + info("thread[" + i + "], nMsgs:" + nMessages + " bytes:" + totalBytesRead + " nMsgs/sec:" + (nMessages / elapsedSecs).formatted("%.2f") + " MB/sec:" + (totalBytesRead / elapsedSecs / (1024.0*1024.0)).formatted("%.2f")) @@ -135,9 +133,9 @@ private def shutdownComplete() = shutdownLatch.countDown } - logger.info("Sleeping for " + initialSleep / 1000 + " seconds.") + info("Sleeping for " + initialSleep / 1000 + " seconds.") Thread.sleep(initialSleep) - logger.info("starting threads") + info("starting threads") for (thread <- threadList) thread.start Index: core/src/main/scala/kafka/tools/SimpleConsumerShell.scala =================================================================== --- core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (revision 1200390) +++ core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (working copy) @@ -23,17 +23,14 @@ import kafka.utils._ import kafka.consumer._ import kafka.server._ -import org.apache.log4j.Logger /** * Command line program to dump out messages to standard out using the simple consumer */ -object SimpleConsumerShell { +object SimpleConsumerShell extends LogHelper { def main(args: Array[String]): Unit = { - val logger = Logger.getLogger(getClass) - val parser = new OptionParser val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.") .withRequiredArg @@ -73,7 +70,7 @@ for(arg <- List(urlOpt, topicOpt)) { if(!options.has(arg)) { - logger.error("Missing required argument \"" + arg + "\"") + error("Missing required argument \"" + arg + "\"") parser.printHelpOn(System.err) System.exit(1) } @@ -87,7 +84,7 @@ val printOffsets = if(options.has(printOffsetOpt)) true else false val printMessages = if(options.has(printMessageOpt)) true else false - logger.info("Starting consumer...") + info("Starting consumer...") val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 64*1024) val thread = Utils.newThread("kafka-consumer", new Runnable() { def run() { @@ -96,15 +93,14 @@ val fetchRequest = new FetchRequest(topic, partition, offset, fetchsize) val messageSets = consumer.multifetch(fetchRequest) for (messages <- messageSets) { - if(logger.isDebugEnabled) - logger.debug("multi fetched " + messages.sizeInBytes + " bytes from offset " + offset) + debug("multi fetched " + messages.sizeInBytes + " bytes from offset " + offset) var consumed = 0 for(messageAndOffset <- messages) { if(printMessages) - logger.info("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8")) + info("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8")) offset = messageAndOffset.offset if(printOffsets) - logger.info("next offset = " + offset) + info("next offset = " + offset) consumed += 1 } } Index: core/src/main/scala/kafka/tools/ReplayLogProducer.scala =================================================================== --- core/src/main/scala/kafka/tools/ReplayLogProducer.scala (revision 1200390) +++ core/src/main/scala/kafka/tools/ReplayLogProducer.scala (working copy) @@ -26,15 +26,14 @@ import kafka.serializer.{DefaultEncoder, StringEncoder} import kafka.producer.{ProducerData, DefaultPartitioner, ProducerConfig, Producer} import kafka.consumer._ -import kafka.utils.{ZKStringSerializer, Utils} +import kafka.utils.{ZKStringSerializer, Utils, LogHelper} import kafka.api.OffsetRequest import org.I0Itec.zkclient._ import kafka.message.{CompressionCodec, Message, MessageSet, FileMessageSet} -object ReplayLogProducer { +object ReplayLogProducer extends LogHelper { private val GROUPID: String = "replay-log-producer" - private val logger = Logger.getLogger(getClass) def main(args: Array[String]) { var isNoPrint = false; @@ -147,7 +146,7 @@ def tryCleanupZookeeper(zkUrl: String, groupId: String) { try { val dir = "/consumers/" + groupId - logger.info("Cleaning up temporary zookeeper data under " + dir + ".") + info("Cleaning up temporary zookeeper data under " + dir + ".") val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) zk.deleteRecursive(dir) zk.close() @@ -156,9 +155,8 @@ } } - class ZKConsumerThread(config: Config, stream: KafkaMessageStream[Message]) extends Thread { + class ZKConsumerThread(config: Config, stream: KafkaMessageStream[Message]) extends Thread with LogHelper { val shutdownLatch = new CountDownLatch(1) - val logger = Logger.getLogger(getClass) val props = new Properties() val brokerInfoList = config.brokerInfo.split("=") if (brokerInfoList(0) == "zk.connect") @@ -179,7 +177,7 @@ null, new DefaultPartitioner[Message]) override def run() { - logger.info("Starting consumer thread..") + info("Starting consumer thread..") var messageCount: Int = 0 try { val iter = @@ -194,15 +192,15 @@ Thread.sleep(config.delayedMSBtwSend) messageCount += 1 }catch { - case ie: Exception => logger.error("Skipping this message", ie) + case ie: Exception => error("Skipping this message", ie) } } }catch { - case e: ConsumerTimeoutException => logger.error("consumer thread timing out", e) + case e: ConsumerTimeoutException => error("consumer thread timing out", e) } - logger.info("Sent " + messageCount + " messages") + info("Sent " + messageCount + " messages") shutdownLatch.countDown - logger.info("thread finished execution !" ) + info("thread finished execution !" ) } def shutdown() { Index: core/src/main/scala/kafka/tools/ConsumerShell.scala =================================================================== --- core/src/main/scala/kafka/tools/ConsumerShell.scala (revision 1200390) +++ core/src/main/scala/kafka/tools/ConsumerShell.scala (working copy) @@ -18,9 +18,8 @@ package kafka.tools import joptsimple._ -import kafka.utils.Utils +import kafka.utils.{Utils, LogHelper} import java.util.concurrent.CountDownLatch -import org.apache.log4j.Logger import kafka.consumer._ import kafka.serializer.StringDecoder @@ -28,7 +27,6 @@ * Program to read using the rich consumer and dump the results to standard out */ object ConsumerShell { - val logger = Logger.getLogger(getClass) def main(args: Array[String]): Unit = { val parser = new OptionParser @@ -84,9 +82,8 @@ } } -class ZKConsumerThread(stream: KafkaMessageStream[String]) extends Thread { +class ZKConsumerThread(stream: KafkaMessageStream[String]) extends Thread with LogHelper { val shutdownLatch = new CountDownLatch(1) - val logger = Logger.getLogger(getClass) override def run() { println("Starting consumer thread..") @@ -98,7 +95,7 @@ } }catch { case e:ConsumerTimeoutException => // this is ok - case oe: Exception => logger.error("error in ZKConsumerThread", oe) + case oe: Exception => error("error in ZKConsumerThread", oe) } shutdownLatch.countDown println("Received " + count + " messages") Index: core/src/main/scala/kafka/tools/ProducerPerformance.scala =================================================================== --- core/src/main/scala/kafka/tools/ProducerPerformance.scala (revision 1200390) +++ core/src/main/scala/kafka/tools/ProducerPerformance.scala (working copy) @@ -17,13 +17,12 @@ package kafka.tools -import kafka.utils.Utils +import kafka.utils.{Utils, LogHelper} import java.util.concurrent.{CountDownLatch, Executors} import java.util.concurrent.atomic.AtomicLong import kafka.producer._ import async.DefaultEventHandler import kafka.serializer.StringEncoder -import org.apache.log4j.Logger import joptsimple.{OptionSet, OptionParser} import java.util.{Random, Properties} import kafka.message.{CompressionCodec, Message, ByteBufferMessageSet} @@ -31,14 +30,13 @@ /** * Load test for the producer */ -object ProducerPerformance { +object ProducerPerformance extends LogHelper { def main(args: Array[String]) { - val logger = Logger.getLogger(getClass) val config = new PerfConfig(args) if(!config.isFixSize) - logger.info("WARN: Throughput will be slower due to changing message size per request") + info("WARN: Throughput will be slower due to changing message size per request") val totalBytesSent = new AtomicLong(0) val totalMessagesSent = new AtomicLong(0) @@ -56,9 +54,9 @@ allDone.await() val elapsedSecs = (System.currentTimeMillis - startMs) / 1000.0 - logger.info("Total Num Messages: " + totalMessagesSent.get + " bytes: " + totalBytesSent.get + " in " + elapsedSecs + " secs") - logger.info("Messages/sec: " + (1.0 * totalMessagesSent.get / elapsedSecs).formatted("%.4f")) - logger.info("MB/sec: " + (totalBytesSent.get / elapsedSecs / (1024.0*1024.0)).formatted("%.4f")) + info("Total Num Messages: " + totalMessagesSent.get + " bytes: " + totalBytesSent.get + " in " + elapsedSecs + " secs") + info("Messages/sec: " + (1.0 * totalMessagesSent.get / elapsedSecs).formatted("%.4f")) + info("MB/sec: " + (totalBytesSent.get / elapsedSecs / (1024.0*1024.0)).formatted("%.4f")) System.exit(0) } @@ -136,8 +134,7 @@ val totalBytesSent: AtomicLong, val totalMessagesSent: AtomicLong, val allDone: CountDownLatch, - val rand: Random) extends Runnable { - val logger = Logger.getLogger(getClass) + val rand: Random) extends Runnable with LogHelper { val brokerInfoList = config.brokerInfo.split("=") val props = new Properties() if (brokerInfoList(0) == "zk.connect") @@ -151,7 +148,7 @@ props.put("reconnect.interval", Integer.MAX_VALUE.toString) props.put("buffer.size", (64*1024).toString) props.put("queue.enqueueTimeout.ms", "-1") - logger.info("Producer properties = " + props.toString) + info("Producer properties = " + props.toString) val producerConfig = new ProducerConfig(props) val producer = new Producer[String, String](producerConfig, new StringEncoder, @@ -166,7 +163,7 @@ var reportTime = System.currentTimeMillis() var lastReportTime = reportTime val messagesPerThread = config.numMessages / config.numThreads - logger.info("Messages per thread = " + messagesPerThread) + info("Messages per thread = " + messagesPerThread) for(j <- 0 until messagesPerThread) { var strLength = config.messageSize if (!config.isFixSize) { @@ -183,7 +180,7 @@ } if(nSends % config.reportingInterval == 0) { reportTime = System.currentTimeMillis() - logger.info("thread " + threadId + ": " + nSends + " messages sent " + info("thread " + threadId + ": " + nSends + " messages sent " + (1000.0 * (nSends - lastNSends) / (reportTime - lastReportTime)).formatted("%.4f") + " nMsg/sec " + (1000.0 * (bytesSent - lastBytesSent) / (reportTime - lastReportTime) / (1024 * 1024)).formatted("%.4f") + " MBs/sec") lastReportTime = reportTime @@ -203,8 +200,7 @@ val totalBytesSent: AtomicLong, val totalMessagesSent: AtomicLong, val allDone: CountDownLatch, - val rand: Random) extends Runnable { - val logger = Logger.getLogger(getClass) + val rand: Random) extends Runnable with LogHelper { val props = new Properties() val brokerInfoList = config.brokerInfo.split("=") if (brokerInfoList(0) == "zk.connect") @@ -228,7 +224,7 @@ var reportTime = System.currentTimeMillis() var lastReportTime = reportTime val messagesPerThread = config.numMessages / config.numThreads / config.batchSize - logger.info("Messages per thread = " + messagesPerThread) + info("Messages per thread = " + messagesPerThread) var messageSet: List[String] = Nil for(k <- 0 until config.batchSize) { messageSet ::= message @@ -251,7 +247,7 @@ } if(nSends % config.reportingInterval == 0) { reportTime = System.currentTimeMillis() - logger.info("thread " + threadId + ": " + nSends + " messages sent " + info("thread " + threadId + ": " + nSends + " messages sent " + (1000.0 * (nSends - lastNSends) * config.batchSize / (reportTime - lastReportTime)).formatted("%.4f") + " nMsg/sec " + (1000.0 * (bytesSent - lastBytesSent) / (reportTime - lastReportTime) / (1024 * 1024)).formatted("%.4f") + " MBs/sec") lastReportTime = reportTime Index: core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala =================================================================== --- core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala (revision 1200390) +++ core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala (working copy) @@ -20,12 +20,10 @@ import joptsimple._ import org.I0Itec.zkclient.ZkClient -import kafka.utils.{ZkUtils, ZKStringSerializer} -import org.apache.log4j.Logger +import kafka.utils.{ZkUtils, ZKStringSerializer, LogHelper} import kafka.consumer.SimpleConsumer import collection.mutable.Map -object ConsumerOffsetChecker { - private val logger = Logger.getLogger(getClass) +object ConsumerOffsetChecker extends LogHelper { private val consumerMap: Map[String, Option[SimpleConsumer]] = Map() @@ -40,7 +38,7 @@ case BrokerIpPattern(ip, port) => Some(new SimpleConsumer(ip, port.toInt, 10000, 100000)) case _ => - logger.error("Could not parse broker info %s".format(brokerInfo)) + error("Could not parse broker info %s".format(brokerInfo)) None } consumer @@ -75,7 +73,7 @@ case None => // ignore } case _ => - logger.error("Could not parse broker/partition pair %s".format(bidPid)) + error("Could not parse broker/partition pair %s".format(bidPid)) } } @@ -139,7 +137,7 @@ zkClient, "/consumers/%s/offsets".format(group)).toList } - logger.debug("zkConnect = %s; topics = %s; group = %s".format( + debug("zkConnect = %s; topics = %s; group = %s".format( zkConnect, topicList.toString(), group)) topicList.foreach { Index: core/src/main/scala/kafka/utils/Utils.scala =================================================================== --- core/src/main/scala/kafka/utils/Utils.scala (revision 1200390) +++ core/src/main/scala/kafka/utils/Utils.scala (working copy) @@ -33,8 +33,7 @@ /** * Helper functions! */ -object Utils { - private val logger = Logger.getLogger(getClass()) +object Utils extends LogHelper { /** * Wrap the given function in a java.lang.Runnable @@ -60,8 +59,8 @@ catch { case t => // log any error and the stack trace - logger.error(t, t) - logger.error(stackTrace(t), t) + error(t) + error(stackTrace(t), t) } } } @@ -525,10 +524,10 @@ { try{ val tempSplit = csVals(i).split(":") - logger.info(successMsg + tempSplit(0) + " : " + Integer.parseInt(tempSplit(1).trim)) + info(successMsg + tempSplit(0) + " : " + Integer.parseInt(tempSplit(1).trim)) map += tempSplit(0).asInstanceOf[K] -> Integer.parseInt(tempSplit(1).trim).asInstanceOf[V] } catch { - case _ => logger.error(exceptionMsg + ": " + csVals(i)) + case _ => error(exceptionMsg + ": " + csVals(i)) } } map Index: core/src/main/scala/kafka/utils/ZkUtils.scala =================================================================== --- core/src/main/scala/kafka/utils/ZkUtils.scala (revision 1200390) +++ core/src/main/scala/kafka/utils/ZkUtils.scala (working copy) @@ -23,13 +23,11 @@ import scala.collection._ import java.util.Properties import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError} -import org.apache.log4j.Logger -object ZkUtils { +object ZkUtils extends LogHelper { val ConsumersPath = "/consumers" val BrokerIdsPath = "/brokers/ids" val BrokerTopicsPath = "/brokers/topics" - private val logger = Logger.getLogger(getClass()) /** * make sure a persistent path exists in ZK. Create the path if not exist. @@ -83,12 +81,12 @@ case e2 => throw e2 } if (storedData == null || storedData != data) { - logger.info("conflict in " + path + " data: " + data + " stored data: " + storedData) + info("conflict in " + path + " data: " + data + " stored data: " + storedData) throw e } else { // otherwise, the creation succeeded, return normally - logger.info(path + " exists with value " + data + " during connection loss; this is ok") + info(path + " exists with value " + data + " during connection loss; this is ok") } } case e2 => throw e2 @@ -142,7 +140,7 @@ catch { case e: ZkNoNodeException => // this can happen during a connection loss event, return normally - logger.info(path + " deleted during connection loss; this is ok") + info(path + " deleted during connection loss; this is ok") case e2 => throw e2 } } @@ -154,7 +152,7 @@ catch { case e: ZkNoNodeException => // this can happen during a connection loss event, return normally - logger.info(path + " deleted during connection loss; this is ok") + info(path + " deleted during connection loss; this is ok") case e2 => throw e2 } } Index: core/src/main/scala/kafka/utils/Mx4jLoader.scala =================================================================== --- core/src/main/scala/kafka/utils/Mx4jLoader.scala (revision 1200390) +++ core/src/main/scala/kafka/utils/Mx4jLoader.scala (working copy) @@ -20,7 +20,6 @@ import java.lang.management.ManagementFactory import javax.management.ObjectName -import org.apache.log4j.Logger /** * If mx4j-tools is in the classpath call maybeLoad to load the HTTP interface of mx4j. @@ -31,8 +30,7 @@ * * This is a Scala port of org.apache.cassandra.utils.Mx4jTool written by Ran Tavory for CASSANDRA-1068 * */ -object Mx4jLoader { - private val logger = Logger.getLogger(getClass()) +object Mx4jLoader extends LogHelper { def maybeLoad(): Boolean = { if (!Utils.getBoolean(System.getProperties(), "kafka_mx4jenable", false)) @@ -40,7 +38,7 @@ val address = System.getProperty("mx4jaddress", "0.0.0.0") val port = Utils.getInt(System.getProperties(), "mx4jport", 8082) try { - logger.debug("Will try to load MX4j now, if it's in the classpath"); + debug("Will try to load MX4j now, if it's in the classpath"); val mbs = ManagementFactory.getPlatformMBeanServer() val processorName = new ObjectName("Server:name=XSLTProcessor") @@ -58,15 +56,15 @@ httpAdaptorClass.getMethod("setProcessor", Class.forName("mx4j.tools.adaptor.http.ProcessorMBean")).invoke(httpAdaptor, xsltProcessor.asInstanceOf[AnyRef]) mbs.registerMBean(xsltProcessor, processorName) httpAdaptorClass.getMethod("start").invoke(httpAdaptor) - logger.info("mx4j successfuly loaded") + info("mx4j successfuly loaded") true } catch { case e: ClassNotFoundException => { - logger.info("Will not load MX4J, mx4j-tools.jar is not in the classpath"); + info("Will not load MX4J, mx4j-tools.jar is not in the classpath"); } case e => { - logger.warn("Could not start register mbean in JMX", e); + warn("Could not start register mbean in JMX", e); } } false Index: core/src/main/scala/kafka/utils/KafkaScheduler.scala =================================================================== --- core/src/main/scala/kafka/utils/KafkaScheduler.scala (revision 1200390) +++ core/src/main/scala/kafka/utils/KafkaScheduler.scala (working copy) @@ -20,14 +20,12 @@ import java.util.concurrent._ import java.util.concurrent.atomic._ import kafka.utils._ -import org.apache.log4j.Logger /** * A scheduler for running jobs in the background * TODO: ScheduledThreadPoolExecutor notriously swallows exceptions */ -class KafkaScheduler(val numThreads: Int, val baseThreadName: String, isDaemon: Boolean) { - private val logger = Logger.getLogger(getClass()) +class KafkaScheduler(val numThreads: Int, val baseThreadName: String, isDaemon: Boolean) extends LogHelper { private val threadId = new AtomicLong(0) private val executor = new ScheduledThreadPoolExecutor(numThreads, new ThreadFactory() { def newThread(runnable: Runnable): Thread = { @@ -44,11 +42,11 @@ def shutdownNow() { executor.shutdownNow() - logger.info("force shutdown scheduler " + baseThreadName) + info("force shutdown scheduler " + baseThreadName) } def shutdown() { executor.shutdown() - logger.info("shutdown scheduler " + baseThreadName) + info("shutdown scheduler " + baseThreadName) } } Index: core/src/main/scala/kafka/utils/Logging.scala =================================================================== --- core/src/main/scala/kafka/utils/Logging.scala (revision 0) +++ core/src/main/scala/kafka/utils/Logging.scala (revision 0) @@ -0,0 +1,53 @@ +package kafka.utils + +import org.apache.log4j.Logger + +trait LogHelper { + val loggerName = this.getClass.getName + lazy val logger = Logger.getLogger(loggerName) + + def trace(msg: => String) = { + if (logger.isTraceEnabled()) + logger.trace(msg) + } + def debug(msg: => String) = { + if (logger.isDebugEnabled()) + logger.debug(msg) + } + def debug(msg: => String, e: => Throwable) = { + if (logger.isDebugEnabled()) + logger.debug(msg,e) + } + def info(msg: => String) = { + if (logger.isInfoEnabled()) + logger.info(msg) + } + def info(msg: => String,e: => Throwable) = { + if (logger.isInfoEnabled()) + logger.info(msg,e) + } + def warn(msg: => String) = { + logger.warn(msg) + } + def warn(msg: => String, e: => Throwable) = { + logger.warn(msg,e) + } + def error(msg: => String):Unit = { + logger.error(msg) + } + def error(msg: => String, e: => Throwable) = { + logger.error(msg,e) + } + def error(e: => Throwable): Any = { + logger.error(e) + } + def fatal(msg: => String): Unit = { + logger.fatal(msg) + } + def fatal(msg: => String, e: => Throwable) = { + logger.fatal(msg,e) + } + def fatal(e: => Throwable): Any = { + logger.fatal(e) + } +} \ No newline at end of file Index: core/src/main/scala/kafka/utils/Throttler.scala =================================================================== --- core/src/main/scala/kafka/utils/Throttler.scala (revision 1200390) +++ core/src/main/scala/kafka/utils/Throttler.scala (working copy) @@ -17,11 +17,9 @@ package kafka.utils; -import org.apache.log4j.Logger import scala.math._ -object Throttler { - val logger = Logger.getLogger(classOf[Throttler]) +object Throttler extends LogHelper { val DefaultCheckIntervalMs = 100L } @@ -67,8 +65,7 @@ val ellapsedMs = ellapsedNs / Time.NsPerMs val sleepTime = round(observedSoFar / desiredRateMs - ellapsedMs) if(sleepTime > 0) { - if(Throttler.logger.isDebugEnabled) - Throttler.logger.debug("Natural rate is " + rateInSecs + " per second but desired rate is " + desiredRatePerSec + + Throttler.debug("Natural rate is " + rateInSecs + " per second but desired rate is " + desiredRatePerSec + ", sleeping for " + sleepTime + " ms to compensate.") time.sleep(sleepTime) } Index: core/src/main/scala/kafka/log/Log.scala =================================================================== --- core/src/main/scala/kafka/log/Log.scala (revision 1200390) +++ core/src/main/scala/kafka/log/Log.scala (working copy) @@ -22,7 +22,6 @@ import java.text.NumberFormat import java.io._ import java.nio.channels.FileChannel -import org.apache.log4j._ import kafka.message._ import kafka.utils._ import kafka.common._ @@ -96,10 +95,8 @@ * An append-only log for storing messages. */ @threadsafe -private[log] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean) { +private[log] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean) extends LogHelper { - private val logger = Logger.getLogger(classOf[Log]) - /* A lock that guards all modifications to the log */ private val lock = new Object @@ -155,7 +152,7 @@ //make the final section mutable and run recovery on it if necessary val last = accum.remove(accum.size - 1) last.messageSet.close() - logger.info("Loading the last segment " + last.file.getAbsolutePath() + " in mutable mode, recovery " + needRecovery) + info("Loading the last segment " + last.file.getAbsolutePath() + " in mutable mode, recovery " + needRecovery) val mutable = new LogSegment(last.file, new FileMessageSet(last.file, true, new AtomicBoolean(needRecovery)), last.start) accum.add(mutable) } @@ -280,8 +277,7 @@ val last = segments.view.last val newOffset = nextAppendOffset val newFile = new File(dir, Log.nameFromOffset(newOffset)) - if(logger.isDebugEnabled) - logger.debug("Rolling log '" + name + "' to " + newFile.getName()) + debug("Rolling log '" + name + "' to " + newFile.getName()) segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset)) } } @@ -302,8 +298,7 @@ if (unflushed.get == 0) return lock synchronized { - if(logger.isDebugEnabled) - logger.debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " + + debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " + System.currentTimeMillis) segments.view.last.messageSet.flush() unflushed.set(0) @@ -332,9 +327,7 @@ startIndex = 0 case _ => var isFound = false - if(logger.isDebugEnabled) { - logger.debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2))) - } + debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2))) startIndex = offsetTimeArray.length - 1 while (startIndex >= 0 && !isFound) { if (offsetTimeArray(startIndex)._2 <= request.time) Index: core/src/main/scala/kafka/log/LogManager.scala =================================================================== --- core/src/main/scala/kafka/log/LogManager.scala (revision 1200390) +++ core/src/main/scala/kafka/log/LogManager.scala (working copy) @@ -18,7 +18,6 @@ package kafka.log import java.io._ -import org.apache.log4j.Logger import kafka.utils._ import scala.actors.Actor import scala.collection._ @@ -35,14 +34,13 @@ private val time: Time, val logCleanupIntervalMs: Long, val logCleanupDefaultAgeMs: Long, - needRecovery: Boolean) { + needRecovery: Boolean) extends LogHelper { val logDir: File = new File(config.logDir) private val numPartitions = config.numPartitions private val maxSize: Long = config.logFileSize private val flushInterval = config.flushInterval private val topicPartitionsMap = config.topicPartitionsMap - private val logger = Logger.getLogger(classOf[LogManager]) private val logCreationLock = new Object private val random = new java.util.Random private var kafkaZookeeper: KafkaZooKeeper = null @@ -56,7 +54,7 @@ /* Initialize a log for each subdirectory of the main log directory */ private val logs = new Pool[String, Pool[Int, Log]]() if(!logDir.exists()) { - logger.info("No log directory found, creating '" + logDir.getAbsolutePath() + "'") + info("No log directory found, creating '" + logDir.getAbsolutePath() + "'") logDir.mkdirs() } if(!logDir.isDirectory() || !logDir.canRead()) @@ -65,9 +63,9 @@ if(subDirs != null) { for(dir <- subDirs) { if(!dir.isDirectory()) { - logger.warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?") + warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?") } else { - logger.info("Loading log '" + dir.getName() + "'") + info("Loading log '" + dir.getName() + "'") val log = new Log(dir, maxSize, flushInterval, needRecovery) val topicPartion = Utils.getTopicPartition(dir.getName) logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]()) @@ -79,7 +77,7 @@ /* Schedule the cleanup task to delete old logs */ if(scheduler != null) { - logger.info("starting log cleaner every " + logCleanupIntervalMs + " ms") + info("starting log cleaner every " + logCleanupIntervalMs + " ms") scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs) } @@ -95,10 +93,10 @@ kafkaZookeeper.registerTopicInZk(topic) } catch { - case e => logger.error(e) // log it and let it go + case e => error(e) // log it and let it go } case StopActor => - logger.info("zkActor stopped") + info("zkActor stopped") exit } } @@ -126,7 +124,7 @@ kafkaZookeeper.registerTopicInZk(topic) startupLatch.countDown } - logger.info("Starting log flusher every " + config.flushSchedulerThreadRate + " ms with the following overrides " + logFlushIntervalMap) + info("Starting log flusher every " + config.flushSchedulerThreadRate + " ms with the following overrides " + logFlushIntervalMap) logFlusherScheduler.scheduleWithRate(flushAllLogs, config.flushSchedulerThreadRate, config.flushSchedulerThreadRate) } @@ -164,7 +162,7 @@ if (topic.length <= 0) throw new InvalidTopicException("topic name can't be empty") if (partition < 0 || partition >= topicPartitionsMap.getOrElse(topic, numPartitions)) { - logger.warn("Wrong partition " + partition + " valid partitions (0," + + warn("Wrong partition " + partition + " valid partitions (0," + (topicPartitionsMap.getOrElse(topic, numPartitions) - 1) + ")") throw new InvalidPartitionException("wrong partition " + partition) } @@ -186,7 +184,7 @@ log = found } else - logger.info("Created log for '" + topic + "'-" + partition) + info("Created log for '" + topic + "'-" + partition) } if (hasNewTopic) @@ -198,10 +196,10 @@ private def deleteSegments(log: Log, segments: Seq[LogSegment]): Int = { var total = 0 for(segment <- segments) { - logger.info("Deleting log segment " + segment.file.getName() + " from " + log.name) + info("Deleting log segment " + segment.file.getName() + " from " + log.name) Utils.swallow(logger.warn, segment.messageSet.close()) if(!segment.file.delete()) { - logger.warn("Delete failed.") + warn("Delete failed.") } else { total += 1 } @@ -243,16 +241,16 @@ * Delete any eligible logs. Return the number of segments deleted. */ def cleanupLogs() { - logger.debug("Beginning log cleanup...") + debug("Beginning log cleanup...") val iter = getLogIterator var total = 0 val startMs = time.milliseconds while(iter.hasNext) { val log = iter.next - logger.debug("Garbage collecting '" + log.name + "'") + debug("Garbage collecting '" + log.name + "'") total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log) } - logger.debug("Log cleanup completed. " + total + " files deleted in " + + debug("Log cleanup completed. " + total + " files deleted in " + (time.milliseconds - startMs) / 1000 + " seconds") } @@ -291,8 +289,7 @@ } private def flushAllLogs() = { - if (logger.isDebugEnabled) - logger.debug("flushing the high watermark of all logs") + debug("flushing the high watermark of all logs") for (log <- getLogIterator) { @@ -301,18 +298,17 @@ var logFlushInterval = config.defaultFlushIntervalMs if(logFlushIntervalMap.contains(log.getTopicName)) logFlushInterval = logFlushIntervalMap(log.getTopicName) - if (logger.isDebugEnabled) - logger.debug(log.getTopicName + " flush interval " + logFlushInterval + + debug(log.getTopicName + " flush interval " + logFlushInterval + " last flushed " + log.getLastFlushedTime + " timesincelastFlush: " + timeSinceLastFlush) if(timeSinceLastFlush >= logFlushInterval) log.flush } catch { case e => - logger.error("Error flushing topic " + log.getTopicName, e) + error("Error flushing topic " + log.getTopicName, e) e match { case _: IOException => - logger.fatal("Halting due to unrecoverable I/O error while flushing logs: " + e.getMessage, e) + fatal("Halting due to unrecoverable I/O error while flushing logs: " + e.getMessage, e) Runtime.getRuntime.halt(1) case _ => } Index: core/src/main/scala/kafka/producer/ProducerPool.scala =================================================================== --- core/src/main/scala/kafka/producer/ProducerPool.scala (revision 1200390) +++ core/src/main/scala/kafka/producer/ProducerPool.scala (working copy) @@ -20,12 +20,11 @@ import async._ import java.util.Properties import kafka.serializer.Encoder -import org.apache.log4j.Logger import java.util.concurrent.{ConcurrentMap, ConcurrentHashMap} import kafka.cluster.{Partition, Broker} import kafka.api.ProducerRequest import kafka.common.{UnavailableProducerException, InvalidConfigException} -import kafka.utils.Utils +import kafka.utils.{Utils, LogHelper} import kafka.message.{NoCompressionCodec, ByteBufferMessageSet} class ProducerPool[V](private val config: ProducerConfig, @@ -33,9 +32,8 @@ private val syncProducers: ConcurrentMap[Int, SyncProducer], private val asyncProducers: ConcurrentMap[Int, AsyncProducer[V]], private val inputEventHandler: EventHandler[V] = null, - private val cbkHandler: CallbackHandler[V] = null) { + private val cbkHandler: CallbackHandler[V] = null) extends LogHelper { - private val logger = Logger.getLogger(classOf[ProducerPool[V]]) private var eventHandler = inputEventHandler if(eventHandler == null) eventHandler = new DefaultEventHandler(config, cbkHandler) @@ -76,7 +74,7 @@ props.putAll(config.props) if(sync) { val producer = new SyncProducer(new SyncProducerConfig(props)) - logger.info("Creating sync producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port) + info("Creating sync producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port) syncProducers.put(broker.id, producer) } else { val producer = new AsyncProducer[V](new AsyncProducerConfig(props), @@ -85,7 +83,7 @@ eventHandler, config.eventHandlerProps, cbkHandler, config.cbkHandlerProps) producer.start - logger.info("Creating async producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port) + info("Creating async producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port) asyncProducers.put(broker.id, producer) } } @@ -107,7 +105,7 @@ val producerRequests = requestsForThisBid._1.map(req => new ProducerRequest(req.getTopic, req.getBidPid.partId, new ByteBufferMessageSet(compressionCodec = config.compressionCodec, messages = req.getData.map(d => serializer.toMessage(d)): _*))) - logger.debug("Fetching sync producer for broker id: " + bid) + debug("Fetching sync producer for broker id: " + bid) val producer = syncProducers.get(bid) if(producer != null) { if(producerRequests.size > 1) @@ -117,14 +115,14 @@ partition = producerRequests(0).partition, messages = producerRequests(0).messages) config.compressionCodec match { - case NoCompressionCodec => logger.debug("Sending message to broker " + bid) - case _ => logger.debug("Sending compressed messages to broker " + bid) + case NoCompressionCodec => debug("Sending message to broker " + bid) + case _ => debug("Sending compressed messages to broker " + bid) } }else throw new UnavailableProducerException("Producer pool has not been initialized correctly. " + "Sync Producer for broker " + bid + " does not exist in the pool") }else { - logger.debug("Fetching async producer for broker id: " + bid) + debug("Fetching async producer for broker id: " + bid) val producer = asyncProducers.get(bid) if(producer != null) { requestsForThisBid._1.foreach { req => @@ -132,8 +130,8 @@ } if(logger.isDebugEnabled) config.compressionCodec match { - case NoCompressionCodec => logger.debug("Sending message") - case _ => logger.debug("Sending compressed messages") + case NoCompressionCodec => debug("Sending message") + case _ => debug("Sending compressed messages") } } else @@ -149,12 +147,12 @@ def close() = { config.producerType match { case "sync" => - logger.info("Closing all sync producers") + info("Closing all sync producers") val iter = syncProducers.values.iterator while(iter.hasNext) iter.next.close case "async" => - logger.info("Closing all async producers") + info("Closing all async producers") val iter = asyncProducers.values.iterator while(iter.hasNext) iter.next.close Index: core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala =================================================================== --- core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala (revision 1200390) +++ core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala (working copy) @@ -19,7 +19,7 @@ import kafka.utils.{ZKStringSerializer, ZkUtils, ZKConfig} import collection.mutable.HashMap import collection.mutable.Map -import org.apache.log4j.Logger +import kafka.utils.LogHelper import collection.immutable.TreeSet import kafka.cluster.{Broker, Partition} import org.apache.zookeeper.Watcher.Event.KeeperState @@ -57,8 +57,7 @@ * If zookeeper based auto partition discovery is enabled, fetch broker info like * host, port, number of partitions from zookeeper */ -private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (Int, String, Int) => Unit) extends BrokerPartitionInfo { - private val logger = Logger.getLogger(classOf[ZKBrokerPartitionInfo]) +private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (Int, String, Int) => Unit) extends BrokerPartitionInfo with LogHelper { private val zkWatcherLock = new Object private val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) @@ -75,7 +74,7 @@ // register listener for change of brokers for each topic to keep topicsBrokerPartitions updated topicBrokerPartitions.keySet.foreach {topic => zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + topic, brokerTopicsListener) - logger.debug("Registering listener on path: " + ZkUtils.BrokerTopicsPath + "/" + topic) + debug("Registering listener on path: " + ZkUtils.BrokerTopicsPath + "/" + topic) } // register listener for new broker @@ -138,20 +137,17 @@ } private def bootstrapWithExistingBrokers(topic: String): scala.collection.immutable.SortedSet[Partition] = { - if(logger.isDebugEnabled) logger.debug("Currently, no brokers are registered under topic: " + topic) - if(logger.isDebugEnabled) - logger.debug("Bootstrapping topic: " + topic + " with available brokers in the cluster with default " + + debug("Currently, no brokers are registered under topic: " + topic) + debug("Bootstrapping topic: " + topic + " with available brokers in the cluster with default " + "number of partitions = 1") val allBrokersIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath) - if(logger.isTraceEnabled) - logger.trace("List of all brokers currently registered in zookeeper = " + allBrokersIds.toString) + trace("List of all brokers currently registered in zookeeper = " + allBrokersIds.toString) // since we do not have the in formation about number of partitions on these brokers, just assume single partition // i.e. pick partition 0 from each broker as a candidate val numBrokerPartitions = TreeSet[Partition]() ++ allBrokersIds.map(b => new Partition(b.toInt, 0)) // add the rest of the available brokers with default 1 partition for this topic, so all of the brokers // participate in hosting this topic. - if(logger.isDebugEnabled) - logger.debug("Adding following broker id, partition id for NEW topic: " + topic + "=" + numBrokerPartitions.toString) + debug("Adding following broker id, partition id for NEW topic: " + topic + "=" + numBrokerPartitions.toString) numBrokerPartitions } @@ -171,8 +167,7 @@ val numPartitions = brokerList.map(bid => ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid).toInt) val brokerPartitions = brokerList.map(bid => bid.toInt).zip(numPartitions) val sortedBrokerPartitions = brokerPartitions.sortWith((id1, id2) => id1._1 < id2._1) - if(logger.isDebugEnabled) - logger.debug("Broker ids and # of partitions on each for topic: " + topic + " = " + sortedBrokerPartitions.toString) + debug("Broker ids and # of partitions on each for topic: " + topic + " = " + sortedBrokerPartitions.toString) var brokerParts = SortedSet.empty[Partition] sortedBrokerPartitions.foreach { bp => @@ -182,8 +177,7 @@ } } brokerPartitionsPerTopic += (topic -> brokerParts) - if(logger.isDebugEnabled) - logger.debug("Sorted list of broker ids and partition ids on each for topic: " + topic + " = " + brokerParts.toString) + debug("Sorted list of broker ids and partition ids on each for topic: " + topic + " = " + brokerParts.toString) } brokerPartitionsPerTopic } @@ -208,17 +202,14 @@ * keeps the related data structures updated */ class BrokerTopicsListener(val originalBrokerTopicsPartitionsMap: collection.mutable.Map[String, SortedSet[Partition]], - val originalBrokerIdMap: Map[Int, Broker]) extends IZkChildListener { + val originalBrokerIdMap: Map[Int, Broker]) extends IZkChildListener with LogHelper { private var oldBrokerTopicPartitionsMap = collection.mutable.Map.empty[String, SortedSet[Partition]] ++ originalBrokerTopicsPartitionsMap private var oldBrokerIdMap = collection.mutable.Map.empty[Int, Broker] ++ originalBrokerIdMap - private val logger = Logger.getLogger(classOf[BrokerTopicsListener]) - if(logger.isDebugEnabled) - logger.debug("[BrokerTopicsListener] Creating broker topics listener to watch the following paths - \n" + + debug("[BrokerTopicsListener] Creating broker topics listener to watch the following paths - \n" + "/broker/topics, /broker/topics/topic, /broker/ids") - if(logger.isDebugEnabled) - logger.debug("[BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to " + + debug("[BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to " + "partition id per topic with " + oldBrokerTopicPartitionsMap.toString) @throws(classOf[Exception]) @@ -227,22 +218,18 @@ else new java.util.ArrayList[String]() zkWatcherLock synchronized { - if(logger.isTraceEnabled) - logger.trace("Watcher fired for path: " + parentPath + " with change " + curChilds.toString) + trace("Watcher fired for path: " + parentPath + " with change " + curChilds.toString) import scala.collection.JavaConversions._ parentPath match { case "/brokers/topics" => // this is a watcher for /broker/topics path val updatedTopics = asBuffer(curChilds) - if(logger.isDebugEnabled) { - logger.debug("[BrokerTopicsListener] List of topics changed at " + parentPath + " Updated topics -> " + + debug("[BrokerTopicsListener] List of topics changed at " + parentPath + " Updated topics -> " + curChilds.toString) - logger.debug("[BrokerTopicsListener] Old list of topics: " + oldBrokerTopicPartitionsMap.keySet.toString) - logger.debug("[BrokerTopicsListener] Updated list of topics: " + updatedTopics.toSet.toString) - } + debug("[BrokerTopicsListener] Old list of topics: " + oldBrokerTopicPartitionsMap.keySet.toString) + debug("[BrokerTopicsListener] Updated list of topics: " + updatedTopics.toSet.toString) val newTopics = updatedTopics.toSet &~ oldBrokerTopicPartitionsMap.keySet - if(logger.isDebugEnabled) - logger.debug("[BrokerTopicsListener] List of newly registered topics: " + newTopics.toString) + debug("[BrokerTopicsListener] List of newly registered topics: " + newTopics.toString) newTopics.foreach { topic => val brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic val brokerList = ZkUtils.getChildrenParentMayNotExist(zkClient, brokerTopicPath) @@ -251,16 +238,14 @@ brokerTopicsListener) } case "/brokers/ids" => // this is a watcher for /broker/ids path - if(logger.isDebugEnabled) - logger.debug("[BrokerTopicsListener] List of brokers changed in the Kafka cluster " + parentPath + + debug("[BrokerTopicsListener] List of brokers changed in the Kafka cluster " + parentPath + "\t Currently registered list of brokers -> " + curChilds.toString) processBrokerChange(parentPath, curChilds) case _ => val pathSplits = parentPath.split("/") val topic = pathSplits.last if(pathSplits.length == 4 && pathSplits(2).equals("topics")) { - if(logger.isDebugEnabled) - logger.debug("[BrokerTopicsListener] List of brokers changed at " + parentPath + "\t Currently registered " + + debug("[BrokerTopicsListener] List of brokers changed at " + parentPath + "\t Currently registered " + " list of brokers -> " + curChilds.toString + " for topic -> " + topic) processNewBrokerInExistingTopic(topic, asBuffer(curChilds)) } @@ -277,18 +262,17 @@ import scala.collection.JavaConversions._ val updatedBrokerList = asBuffer(curChilds).map(bid => bid.toInt) val newBrokers = updatedBrokerList.toSet &~ oldBrokerIdMap.keySet - if(logger.isDebugEnabled) logger.debug("[BrokerTopicsListener] List of newly registered brokers: " + newBrokers.toString) + debug("[BrokerTopicsListener] List of newly registered brokers: " + newBrokers.toString) newBrokers.foreach { bid => val brokerInfo = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid) val brokerHostPort = brokerInfo.split(":") allBrokers += (bid -> new Broker(bid, brokerHostPort(1), brokerHostPort(1), brokerHostPort(2).toInt)) - if(logger.isDebugEnabled) logger.debug("[BrokerTopicsListener] Invoking the callback for broker: " + bid) + debug("[BrokerTopicsListener] Invoking the callback for broker: " + bid) producerCbk(bid, brokerHostPort(1), brokerHostPort(2).toInt) } // remove dead brokers from the in memory list of live brokers val deadBrokers = oldBrokerIdMap.keySet &~ updatedBrokerList.toSet - if(logger.isDebugEnabled) - logger.debug("[BrokerTopicsListener] Deleting broker ids for dead brokers: " + deadBrokers.toString) + debug("[BrokerTopicsListener] Deleting broker ids for dead brokers: " + deadBrokers.toString) deadBrokers.foreach {bid => allBrokers = allBrokers - bid // also remove this dead broker from particular topics @@ -297,8 +281,7 @@ case Some(oldBrokerPartitionList) => val aliveBrokerPartitionList = oldBrokerPartitionList.filter(bp => bp.brokerId != bid) topicBrokerPartitions += (topic -> aliveBrokerPartitionList) - if(logger.isDebugEnabled) - logger.debug("[BrokerTopicsListener] Removing dead broker ids for topic: " + topic + "\t " + + debug("[BrokerTopicsListener] Removing dead broker ids for topic: " + topic + "\t " + "Updated list of broker id, partition id = " + aliveBrokerPartitionList.toString) case None => } @@ -317,23 +300,20 @@ // find the old list of brokers for this topic oldBrokerTopicPartitionsMap.get(topic) match { case Some(brokersParts) => - if(logger.isDebugEnabled) - logger.debug("[BrokerTopicsListener] Old list of brokers: " + brokersParts.map(bp => bp.brokerId).toString) + debug("[BrokerTopicsListener] Old list of brokers: " + brokersParts.map(bp => bp.brokerId).toString) case None => } val updatedBrokerList = curChilds.map(b => b.toInt) import ZKBrokerPartitionInfo._ val updatedBrokerParts:SortedSet[Partition] = getBrokerPartitions(zkClient, topic, updatedBrokerList.toList) - if(logger.isDebugEnabled) - logger.debug("[BrokerTopicsListener] Currently registered list of brokers for topic: " + topic + " are " + + debug("[BrokerTopicsListener] Currently registered list of brokers for topic: " + topic + " are " + curChilds.toString) // update the number of partitions on existing brokers var mergedBrokerParts: SortedSet[Partition] = TreeSet[Partition]() ++ updatedBrokerParts topicBrokerPartitions.get(topic) match { case Some(oldBrokerParts) => - if(logger.isDebugEnabled) - logger.debug("[BrokerTopicsListener] Unregistered list of brokers for topic: " + topic + " are " + + debug("[BrokerTopicsListener] Unregistered list of brokers for topic: " + topic + " are " + oldBrokerParts.toString) mergedBrokerParts = oldBrokerParts ++ updatedBrokerParts case None => @@ -341,24 +321,19 @@ // keep only brokers that are alive mergedBrokerParts = mergedBrokerParts.filter(bp => allBrokers.contains(bp.brokerId)) topicBrokerPartitions += (topic -> mergedBrokerParts) - if(logger.isDebugEnabled) - logger.debug("[BrokerTopicsListener] List of broker partitions for topic: " + topic + " are " + + debug("[BrokerTopicsListener] List of broker partitions for topic: " + topic + " are " + mergedBrokerParts.toString) } def resetState = { - if(logger.isTraceEnabled) - logger.trace("[BrokerTopicsListener] Before reseting broker topic partitions state " + + trace("[BrokerTopicsListener] Before reseting broker topic partitions state " + oldBrokerTopicPartitionsMap.toString) oldBrokerTopicPartitionsMap = collection.mutable.Map.empty[String, SortedSet[Partition]] ++ topicBrokerPartitions - if(logger.isDebugEnabled) - logger.debug("[BrokerTopicsListener] After reseting broker topic partitions state " + + debug("[BrokerTopicsListener] After reseting broker topic partitions state " + oldBrokerTopicPartitionsMap.toString) - if(logger.isTraceEnabled) - logger.trace("[BrokerTopicsListener] Before reseting broker id map state " + oldBrokerIdMap.toString) + trace("[BrokerTopicsListener] Before reseting broker id map state " + oldBrokerIdMap.toString) oldBrokerIdMap = collection.mutable.Map.empty[Int, Broker] ++ allBrokers - if(logger.isDebugEnabled) - logger.debug("[BrokerTopicsListener] After reseting broker id map state " + oldBrokerIdMap.toString) + debug("[BrokerTopicsListener] After reseting broker id map state " + oldBrokerIdMap.toString) } } @@ -386,7 +361,7 @@ * When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a * connection for us. */ - logger.info("ZK expired; release old list of broker partitions for topics ") + info("ZK expired; release old list of broker partitions for topics ") topicBrokerPartitions = getZKTopicPartitionInfo allBrokers = getZKBrokerInfo brokerTopicsListener.resetState Index: core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala =================================================================== --- core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala (revision 1200390) +++ core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala (working copy) @@ -18,13 +18,11 @@ import collection.mutable.HashMap import collection.mutable.Map -import org.apache.log4j.Logger import collection.SortedSet import kafka.cluster.{Broker, Partition} import kafka.common.InvalidConfigException private[producer] class ConfigBrokerPartitionInfo(config: ProducerConfig) extends BrokerPartitionInfo { - private val logger = Logger.getLogger(classOf[ConfigBrokerPartitionInfo]) private val brokerPartitions: SortedSet[Partition] = getConfigTopicPartitionInfo private val allBrokers = getConfigBrokerInfo Index: core/src/main/scala/kafka/producer/SyncProducer.scala =================================================================== --- core/src/main/scala/kafka/producer/SyncProducer.scala (revision 1200390) +++ core/src/main/scala/kafka/producer/SyncProducer.scala (working copy) @@ -24,7 +24,6 @@ import kafka.utils._ import kafka.api._ import scala.math._ -import org.apache.log4j.{Level, Logger} import kafka.common.MessageSizeTooLargeException import java.nio.ByteBuffer @@ -36,9 +35,8 @@ * Send a message set. */ @threadsafe -class SyncProducer(val config: SyncProducerConfig) { +class SyncProducer(val config: SyncProducerConfig) extends LogHelper { - private val logger = Logger.getLogger(getClass()) private val MaxConnectBackoffMs = 60000 private var channel : SocketChannel = null private var sentOnConnection = 0 @@ -46,11 +44,11 @@ @volatile private var shutdown: Boolean = false - logger.debug("Instantiating Scala Sync Producer") + debug("Instantiating Scala Sync Producer") private def verifySendBuffer(buffer : ByteBuffer) = { if (logger.isTraceEnabled) { - logger.trace("verifying sendbuffer of size " + buffer.limit) + trace("verifying sendbuffer of size " + buffer.limit) val requestTypeId = buffer.getShort() if (requestTypeId == RequestKeys.MultiProduce) { try { @@ -59,17 +57,17 @@ try { for (messageAndOffset <- produce.messages) if (!messageAndOffset.message.isValid) - logger.trace("topic " + produce.topic + " is invalid") + trace("topic " + produce.topic + " is invalid") } catch { case e: Throwable => - logger.trace("error iterating messages " + e + Utils.stackTrace(e)) + trace("error iterating messages " + e + Utils.stackTrace(e)) } } } catch { case e: Throwable => - logger.trace("error verifying sendbuffer " + e + Utils.stackTrace(e)) + trace("error verifying sendbuffer " + e + Utils.stackTrace(e)) } } } @@ -112,8 +110,7 @@ def send(topic: String, partition: Int, messages: ByteBufferMessageSet) { verifyMessageSize(messages) val setSize = messages.sizeInBytes.asInstanceOf[Int] - if(logger.isTraceEnabled) - logger.trace("Got message set with " + setSize + " bytes to send") + trace("Got message set with " + setSize + " bytes to send") send(new BoundedByteBufferSend(new ProducerRequest(topic, partition, messages))) } @@ -123,8 +120,7 @@ for (request <- produces) verifyMessageSize(request.messages) val setSize = produces.foldLeft(0L)(_ + _.messages.sizeInBytes) - if(logger.isTraceEnabled) - logger.trace("Got multi message sets with " + setSize + " bytes to send") + trace("Got multi message sets with " + setSize + " bytes to send") send(new BoundedByteBufferSend(new MultiProducerRequest(produces))) } @@ -148,13 +144,13 @@ private def disconnect() { try { if(channel != null) { - logger.info("Disconnecting from " + config.host + ":" + config.port) + info("Disconnecting from " + config.host + ":" + config.port) Utils.swallow(logger.warn, channel.close()) Utils.swallow(logger.warn, channel.socket.close()) channel = null } } catch { - case e: Exception => logger.error("Error on disconnect: ", e) + case e: Exception => error("Error on disconnect: ", e) } } @@ -170,7 +166,7 @@ channel.socket.setSoTimeout(config.socketTimeoutMs) channel.socket.setKeepAlive(true) channel.connect(new InetSocketAddress(config.host, config.port)) - logger.info("Connected to " + config.host + ":" + config.port + " for producing") + info("Connected to " + config.host + ":" + config.port + " for producing") } catch { case e: Exception => { @@ -178,10 +174,10 @@ val endTimeMs = SystemTime.milliseconds if ( (endTimeMs - beginTimeMs + connectBackoffMs) > config.connectTimeoutMs) { - logger.error("Producer connection timing out after " + config.connectTimeoutMs + " ms", e) + error("Producer connection timing out after " + config.connectTimeoutMs + " ms", e) throw e } - logger.error("Connection attempt failed, next attempt in " + connectBackoffMs + " ms", e) + error("Connection attempt failed, next attempt in " + connectBackoffMs + " ms", e) SystemTime.sleep(connectBackoffMs) connectBackoffMs = min(10 * connectBackoffMs, MaxConnectBackoffMs) } @@ -219,8 +215,7 @@ def getNumProduceRequests: Long = produceRequestStats.getNumRequests } -object SyncProducerStats { - private val logger = Logger.getLogger(getClass()) +object SyncProducerStats extends LogHelper { private val kafkaProducerstatsMBeanName = "kafka:type=kafka.KafkaProducerStats" private val stats = new SyncProducerStats Utils.swallow(logger.warn, Utils.registerMBean(stats, kafkaProducerstatsMBeanName)) Index: core/src/main/scala/kafka/producer/Producer.scala =================================================================== --- core/src/main/scala/kafka/producer/Producer.scala (revision 1200390) +++ core/src/main/scala/kafka/producer/Producer.scala (working copy) @@ -17,7 +17,6 @@ package kafka.producer import async.{CallbackHandler, EventHandler} -import org.apache.log4j.Logger import kafka.serializer.Encoder import kafka.utils._ import java.util.Properties @@ -32,13 +31,12 @@ populateProducerPool: Boolean, private var brokerPartitionInfo: BrokerPartitionInfo) /* for testing purpose only. Applications should ideally */ /* use the other constructor*/ -{ - private val logger = Logger.getLogger(classOf[Producer[K, V]]) +extends LogHelper { private val hasShutdown = new AtomicBoolean(false) if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerList)) throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified") if (Utils.propertyExists(config.zkConnect) && Utils.propertyExists(config.brokerList)) - logger.warn("Both zk.connect and broker.list provided (zk.connect takes precedence).") + warn("Both zk.connect and broker.list provided (zk.connect takes precedence).") private val random = new java.util.Random // check if zookeeper based auto partition discovery is enabled private val zkEnabled = Utils.propertyExists(config.zkConnect) @@ -115,7 +113,7 @@ var numRetries: Int = 0 while(numRetries <= config.zkReadRetries && brokerInfoOpt.isEmpty) { if(numRetries > 0) { - logger.info("Try #" + numRetries + " ZK producer cache is stale. Refreshing it by reading from ZK again") + info("Try #" + numRetries + " ZK producer cache is stale. Refreshing it by reading from ZK again") brokerPartitionInfo.updateInfo } @@ -130,7 +128,7 @@ brokerInfoOpt match { case Some(brokerInfo) => - if(logger.isDebugEnabled) logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + + debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + " on partition " + brokerIdPartition.get.partId) case None => throw new NoBrokersForPartitionException("Invalid Zookeeper state. Failed to get partition for topic: " + @@ -153,12 +151,10 @@ val brokerIdPartition = topicPartitionsList(randomBrokerId) val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get - if(logger.isDebugEnabled) - logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + + debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + " on a randomly chosen partition") val partition = ProducerRequest.RandomPartition - if(logger.isDebugEnabled) - logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + " on a partition " + + debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + " on a partition " + brokerIdPartition.partId) producerPool.getProducerPoolData(pd.getTopic, new Partition(brokerIdPartition.brokerId, partition), @@ -168,11 +164,9 @@ } private def getPartitionListForTopic(pd: ProducerData[K,V]): Seq[Partition] = { - if(logger.isDebugEnabled) - logger.debug("Getting the number of broker partitions registered for topic: " + pd.getTopic) + debug("Getting the number of broker partitions registered for topic: " + pd.getTopic) val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq - if(logger.isDebugEnabled) - logger.debug("Broker partitions registered for topic: " + pd.getTopic + " = " + topicPartitionsList) + debug("Broker partitions registered for topic: " + pd.getTopic + " = " + topicPartitionsList) val totalNumPartitions = topicPartitionsList.length if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey) topicPartitionsList @@ -206,7 +200,7 @@ */ private def producerCbk(bid: Int, host: String, port: Int) = { if(populateProducerPool) producerPool.addProducer(new Broker(bid, host, host, port)) - else logger.debug("Skipping the callback since populateProducerPool = false") + else debug("Skipping the callback since populateProducerPool = false") } /** Index: core/src/main/scala/kafka/producer/ConsoleProducer.scala =================================================================== --- core/src/main/scala/kafka/producer/ConsoleProducer.scala (revision 1200390) +++ core/src/main/scala/kafka/producer/ConsoleProducer.scala (working copy) @@ -20,7 +20,6 @@ import scala.collection.JavaConversions._ import org.I0Itec.zkclient._ import joptsimple._ -import org.apache.log4j.Logger import java.util.Arrays.asList import java.util.Properties import java.util.Random @@ -31,8 +30,6 @@ object ConsoleProducer { - private val logger = Logger.getLogger(getClass()) - def main(args: Array[String]) { val parser = new OptionParser val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.") Index: core/src/main/scala/kafka/producer/async/AsyncProducer.scala =================================================================== --- core/src/main/scala/kafka/producer/async/AsyncProducer.scala (revision 1200390) +++ core/src/main/scala/kafka/producer/async/AsyncProducer.scala (working copy) @@ -18,9 +18,8 @@ package kafka.producer.async import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} -import kafka.utils.Utils +import kafka.utils.{Utils, LogHelper} import java.util.concurrent.atomic.AtomicBoolean -import org.apache.log4j.{Level, Logger} import kafka.api.ProducerRequest import kafka.serializer.Encoder import java.lang.management.ManagementFactory @@ -40,8 +39,7 @@ eventHandler: EventHandler[T] = null, eventHandlerProps: Properties = null, cbkHandler: CallbackHandler[T] = null, - cbkHandlerProps: Properties = null) { - private val logger = Logger.getLogger(classOf[AsyncProducer[T]]) + cbkHandlerProps: Properties = null) extends LogHelper { private val closed = new AtomicBoolean(false) private val queue = new LinkedBlockingQueue[QueueItem[T]](config.queueSize) // initialize the callback handlers @@ -63,7 +61,7 @@ mbs.unregisterMBean(objName) mbs.registerMBean(asyncProducerStats, objName) }catch { - case e: Exception => logger.warn("can't register AsyncProducerStats") + case e: Exception => warn("can't register AsyncProducerStats") } def this(config: AsyncProducerConfig) { @@ -107,7 +105,7 @@ case e: InterruptedException => val msg = "%s interrupted during enqueue of event %s.".format( getClass.getSimpleName, event.toString) - logger.error(msg) + error(msg) throw new AsyncProducerInterruptedException(msg) } } @@ -117,31 +115,29 @@ if(!added) { asyncProducerStats.recordDroppedEvents - logger.error("Event queue is full of unsent messages, could not send event: " + event.toString) + error("Event queue is full of unsent messages, could not send event: " + event.toString) throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + event.toString) }else { - if(logger.isTraceEnabled) { - logger.trace("Added event to send queue for topic: " + topic + ", partition: " + partition + ":" + event.toString) - logger.trace("Remaining queue size: " + queue.remainingCapacity) - } + trace("Added event to send queue for topic: " + topic + ", partition: " + partition + ":" + event.toString) + trace("Remaining queue size: " + queue.remainingCapacity) } } def close = { if(cbkHandler != null) { cbkHandler.close - logger.info("Closed the callback handler") + info("Closed the callback handler") } closed.set(true) queue.put(new QueueItem(AsyncProducer.Shutdown.asInstanceOf[T], null, -1)) - if(logger.isDebugEnabled) - logger.debug("Added shutdown command to the queue") + debug("Added shutdown command to the queue") sendThread.shutdown sendThread.awaitShutdown producer.close - logger.info("Closed AsyncProducer") + info("Closed AsyncProducer") } + import org.apache.log4j.Level // for testing only def setLoggerLevel(level: Level) = logger.setLevel(level) } Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala =================================================================== --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (revision 1200390) +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (working copy) @@ -19,19 +19,17 @@ import collection.mutable.HashMap import collection.mutable.Map -import org.apache.log4j.Logger import kafka.api.ProducerRequest import kafka.serializer.Encoder import java.util.Properties +import kafka.utils.LogHelper import kafka.producer.{ProducerConfig, SyncProducer} import kafka.message.{NoCompressionCodec, ByteBufferMessageSet} private[kafka] class DefaultEventHandler[T](val config: ProducerConfig, - val cbkHandler: CallbackHandler[T]) extends EventHandler[T] { + val cbkHandler: CallbackHandler[T]) extends EventHandler[T] with LogHelper { - private val logger = Logger.getLogger(classOf[DefaultEventHandler[T]]) - override def init(props: Properties) { } override def handle(events: Seq[QueueItem[T]], syncProducer: SyncProducer, serializer: Encoder[T]) { @@ -40,7 +38,7 @@ processedEvents = cbkHandler.beforeSendingData(events) if(logger.isTraceEnabled) - processedEvents.foreach(event => logger.trace("Handling event for Topic: %s, Partition: %d" + processedEvents.foreach(event => trace("Handling event for Topic: %s, Partition: %d" .format(event.getTopic, event.getPartition))) send(serialize(collate(processedEvents), serializer), syncProducer) @@ -50,8 +48,7 @@ if(messagesPerTopic.size > 0) { val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray syncProducer.multiSend(requests) - if(logger.isTraceEnabled) - logger.trace("kafka producer sent messages for topics " + messagesPerTopic) + trace("kafka producer sent messages for topics " + messagesPerTopic) } } @@ -69,27 +66,23 @@ ((topicAndEvents._1._1, topicAndEvents._1._2), config.compressionCodec match { case NoCompressionCodec => - if(logger.isTraceEnabled) - logger.trace("Sending %d messages with no compression to topic %s on partition %d" + trace("Sending %d messages with no compression to topic %s on partition %d" .format(topicAndEvents._2.size, topicAndEvents._1._1, topicAndEvents._1._2)) new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*) case _ => config.compressedTopics.size match { case 0 => - if(logger.isTraceEnabled) - logger.trace("Sending %d messages with compression codec %d to topic %s on partition %d" + trace("Sending %d messages with compression codec %d to topic %s on partition %d" .format(topicAndEvents._2.size, config.compressionCodec.codec, topicAndEvents._1._1, topicAndEvents._1._2)) new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*) case _ => if(config.compressedTopics.contains(topicAndEvents._1._1)) { - if(logger.isTraceEnabled) - logger.trace("Sending %d messages with compression codec %d to topic %s on partition %d" + trace("Sending %d messages with compression codec %d to topic %s on partition %d" .format(topicAndEvents._2.size, config.compressionCodec.codec, topicAndEvents._1._1, topicAndEvents._1._2)) new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*) } else { - if(logger.isTraceEnabled) - logger.trace("Sending %d messages to topic %s and partition %d with no compression as %s is not in compressed.topics - %s" + trace("Sending %d messages to topic %s and partition %d with no compression as %s is not in compressed.topics - %s" .format(topicAndEvents._2.size, topicAndEvents._1._1, topicAndEvents._1._2, topicAndEvents._1._1, config.compressedTopics.toString)) new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*) Index: core/src/main/scala/kafka/producer/async/ProducerSendThread.scala =================================================================== --- core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (revision 1200390) +++ core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (working copy) @@ -17,9 +17,8 @@ package kafka.producer.async -import kafka.utils.SystemTime +import kafka.utils.{SystemTime, LogHelper} import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue} -import org.apache.log4j.Logger import collection.mutable.ListBuffer import kafka.serializer.Encoder import kafka.producer.SyncProducer @@ -32,25 +31,23 @@ val cbkHandler: CallbackHandler[T], val queueTime: Long, val batchSize: Int, - val shutdownCommand: Any) extends Thread(threadName) { + val shutdownCommand: Any) extends Thread(threadName) with LogHelper { - private val logger = Logger.getLogger(classOf[ProducerSendThread[T]]) private val shutdownLatch = new CountDownLatch(1) override def run { try { val remainingEvents = processEvents - if(logger.isDebugEnabled) logger.debug("Remaining events = " + remainingEvents.size) + debug("Remaining events = " + remainingEvents.size) // handle remaining events if(remainingEvents.size > 0) { - if(logger.isDebugEnabled) - logger.debug("Dispatching last batch of %d events to the event handler".format(remainingEvents.size)) + debug("Dispatching last batch of %d events to the event handler".format(remainingEvents.size)) tryToHandle(remainingEvents) } }catch { - case e: Exception => logger.error("Error in sending events: ", e) + case e: Exception => error("Error in sending events: ", e) }finally { shutdownLatch.countDown } @@ -60,7 +57,7 @@ def shutdown = { handler.close - logger.info("Shutdown thread complete") + info("Shutdown thread complete") } private def processEvents(): Seq[QueueItem[T]] = { @@ -77,8 +74,7 @@ // returns a null object val expired = currentQueueItem == null if(currentQueueItem != null) { - if(logger.isTraceEnabled) - logger.trace("Dequeued item for topic %s and partition %d" + trace("Dequeued item for topic %s and partition %d" .format(currentQueueItem.getTopic, currentQueueItem.getPartition)) // handle the dequeued current item if(cbkHandler != null) @@ -90,10 +86,8 @@ full = events.size >= batchSize } if(full || expired) { - if(logger.isDebugEnabled) { - if(expired) logger.debug(elapsed + " ms elapsed. Queue time reached. Sending..") - if(full) logger.debug("Batch full. Sending..") - } + if(expired) debug(elapsed + " ms elapsed. Queue time reached. Sending..") + if(full) debug("Batch full. Sending..") // if either queue time has reached or batch size has reached, dispatch to event handler tryToHandle(events) lastSend = SystemTime.milliseconds @@ -104,7 +98,7 @@ throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue" .format(queue.size)) if(cbkHandler != null) { - logger.info("Invoking the callback handler before handling the last batch of %d events".format(events.size)) + info("Invoking the callback handler before handling the last batch of %d events".format(events.size)) val addedEvents = cbkHandler.lastBatchBeforeClose logEvents("last batch before close", addedEvents) events = events ++ addedEvents @@ -114,19 +108,19 @@ def tryToHandle(events: Seq[QueueItem[T]]) { try { - if(logger.isDebugEnabled) logger.debug("Handling " + events.size + " events") + debug("Handling " + events.size + " events") if(events.size > 0) handler.handle(events, underlyingProducer, serializer) }catch { - case e: Exception => logger.error("Error in handling batch of " + events.size + " events", e) + case e: Exception => error("Error in handling batch of " + events.size + " events", e) } } private def logEvents(tag: String, events: Iterable[QueueItem[T]]) { if(logger.isTraceEnabled) { - logger.trace("events for " + tag + ":") + trace("events for " + tag + ":") for (event <- events) - logger.trace(event.getData.toString) + trace(event.getData.toString) } } } Index: core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala =================================================================== --- core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (revision 1200390) +++ core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (working copy) @@ -20,19 +20,18 @@ import async.MissingConfigException import org.apache.log4j.spi.LoggingEvent import org.apache.log4j.{Logger, AppenderSkeleton} -import kafka.utils.Utils +import kafka.utils.{Utils, LogHelper} import kafka.serializer.Encoder import java.util.{Properties, Date} import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} -class KafkaLog4jAppender extends AppenderSkeleton { +class KafkaLog4jAppender extends AppenderSkeleton with LogHelper { var port:Int = 0 var host:String = null var topic:String = null var encoderClass:String = null private var producer:SyncProducer = null - private val logger = Logger.getLogger(classOf[KafkaLog4jAppender]) private var encoder: Encoder[AnyRef] = null def getPort:Int = port @@ -56,7 +55,7 @@ if(topic == null) throw new MissingConfigException("topic must be specified by the Kafka log4j appender") if(encoderClass == null) { - logger.info("Using default encoder - kafka.producer.DefaultStringEncoder") + info("Using default encoder - kafka.producer.DefaultStringEncoder") encoder = Utils.getObject("kafka.producer.DefaultStringEncoder") }else // instantiate the encoder, if present encoder = Utils.getObject(encoderClass) @@ -64,15 +63,13 @@ props.put("host", host) props.put("port", port.toString) producer = new SyncProducer(new SyncProducerConfig(props)) - logger.info("Kafka producer connected to " + host + "," + port) - logger.info("Logging for topic: " + topic) + info("Kafka producer connected to " + host + "," + port) + info("Logging for topic: " + topic) } override def append(event: LoggingEvent) = { - if (logger.isDebugEnabled){ - logger.debug("[" + new Date(event.getTimeStamp).toString + "]" + event.getRenderedMessage + + debug("[" + new Date(event.getTimeStamp).toString + "]" + event.getRenderedMessage + " for " + host + "," + port) - } val message = encoder.toMessage(event) producer.send(topic, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message)) } Index: core/src/main/scala/kafka/message/ByteBufferMessageSet.scala =================================================================== --- core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (revision 1200390) +++ core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (working copy) @@ -18,7 +18,7 @@ package kafka.message import scala.collection.mutable -import org.apache.log4j.Logger +import kafka.utils.LogHelper import kafka.common.{InvalidMessageSizeException, ErrorMapping} import java.nio.ByteBuffer import java.nio.channels.WritableByteChannel @@ -36,8 +36,7 @@ */ class ByteBufferMessageSet(private val buffer: ByteBuffer, private val initialOffset: Long = 0L, - private val errorCode: Int = ErrorMapping.NoError) extends MessageSet { - private val logger = Logger.getLogger(getClass()) + private val errorCode: Int = ErrorMapping.NoError) extends MessageSet with LogHelper { private var validByteCount = -1L private var shallowValidByteCount = -1L @@ -94,10 +93,9 @@ val size = topIter.getInt() lastMessageSize = size - if(logger.isTraceEnabled) { - logger.trace("Remaining bytes in iterator = " + topIter.remaining) - logger.trace("size of data = " + size) - } + trace("Remaining bytes in iterator = " + topIter.remaining) + trace("size of data = " + size) + if(size < 0 || topIter.remaining < size) { if (currValidBytes == initialOffset || size < 0) throw new InvalidMessageSizeException("invalid message size: " + size + " only received bytes: " + @@ -111,16 +109,13 @@ val newMessage = new Message(message) newMessage.compressionCodec match { case NoCompressionCodec => - if(logger.isDebugEnabled) - logger.debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes)) + debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes)) innerIter = null currValidBytes += 4 + size - if(logger.isTraceEnabled) - logger.trace("currValidBytes = " + currValidBytes) + trace("currValidBytes = " + currValidBytes) new MessageAndOffset(newMessage, currValidBytes) case _ => - if(logger.isDebugEnabled) - logger.debug("Message is compressed. Valid byte count = %d".format(currValidBytes)) + debug("Message is compressed. Valid byte count = %d".format(currValidBytes)) innerIter = CompressionUtils.decompress(newMessage).deepIterator if (!innerIter.hasNext) { currValidBytes += 4 + lastMessageSize @@ -132,8 +127,7 @@ override def makeNext(): MessageAndOffset = { val isInnerDone = innerDone() - if(logger.isDebugEnabled) - logger.debug("makeNext() in deepIterator: innerDone = " + isInnerDone) + debug("makeNext() in deepIterator: innerDone = " + isInnerDone) isInnerDone match { case true => makeNextOuter case false => { Index: core/src/main/scala/kafka/message/FileMessageSet.scala =================================================================== --- core/src/main/scala/kafka/message/FileMessageSet.scala (revision 1200390) +++ core/src/main/scala/kafka/message/FileMessageSet.scala (working copy) @@ -21,7 +21,6 @@ import java.nio._ import java.nio.channels._ import java.util.concurrent.atomic._ -import org.apache.log4j.Logger import kafka._ import kafka.message._ @@ -38,11 +37,10 @@ private[message] val offset: Long, private[message] val limit: Long, val mutable: Boolean, - val needRecover: AtomicBoolean) extends MessageSet { + val needRecover: AtomicBoolean) extends MessageSet with LogHelper { private val setSize = new AtomicLong() private val setHighWaterMark = new AtomicLong() - private val logger = Logger.getLogger(classOf[FileMessageSet]) if(mutable) { if(limit < Long.MaxValue || offset > 0) @@ -52,7 +50,7 @@ // set the file position to the end of the file for appending messages val startMs = System.currentTimeMillis val truncated = recover() - logger.info("Recovery succeeded in " + (System.currentTimeMillis - startMs) / 1000 + + info("Recovery succeeded in " + (System.currentTimeMillis - startMs) / 1000 + " seconds. " + truncated + " bytes truncated.") } else { @@ -63,8 +61,7 @@ } else { setSize.set(scala.math.min(channel.size(), limit) - offset) setHighWaterMark.set(sizeInBytes) - if(logger.isDebugEnabled) - logger.debug("initializing high water mark in immutable mode: " + highWaterMark) + debug("initializing high water mark in immutable mode: " + highWaterMark) } /** @@ -174,11 +171,9 @@ channel.force(true) val elapsedTime = SystemTime.milliseconds - startTime LogFlushStats.recordFlushRequest(elapsedTime) - if (logger.isDebugEnabled) - logger.debug("flush time " + elapsedTime) + debug("flush time " + elapsedTime) setHighWaterMark.set(sizeInBytes) - if(logger.isDebugEnabled) - logger.debug("flush high water mark:" + highWaterMark) + debug("flush high water mark:" + highWaterMark) } /** @@ -207,8 +202,7 @@ channel.truncate(validUpTo) setSize.set(validUpTo) setHighWaterMark.set(validUpTo) - if(logger.isDebugEnabled) - logger.info("recover high water mark:" + highWaterMark) + info("recover high water mark:" + highWaterMark) /* This should not be necessary, but fixes bug 6191269 on some OSs. */ channel.position(validUpTo) needRecover.set(false) @@ -279,8 +273,7 @@ def getNumFlushes: Long = flushRequestStats.getNumRequests } -object LogFlushStats { - private val logger = Logger.getLogger(getClass()) +object LogFlushStats extends LogHelper { private val LogFlushStatsMBeanName = "kafka:type=kafka.LogFlushStats" private val stats = new LogFlushStats Utils.swallow(logger.warn, Utils.registerMBean(stats, LogFlushStatsMBeanName)) Index: core/src/main/scala/kafka/message/CompressionUtils.scala =================================================================== --- core/src/main/scala/kafka/message/CompressionUtils.scala (revision 1200390) +++ core/src/main/scala/kafka/message/CompressionUtils.scala (working copy) @@ -23,10 +23,9 @@ import java.util.zip.GZIPInputStream import java.util.zip.GZIPOutputStream import java.nio.ByteBuffer -import org.apache.log4j.Logger +import kafka.utils.LogHelper -object CompressionUtils { - private val logger = Logger.getLogger(getClass) +object CompressionUtils extends LogHelper { def compress(messages: Iterable[Message]): Message = compress(messages, DefaultCompressionCodec) @@ -34,8 +33,7 @@ case DefaultCompressionCodec => val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream() val gzipOutput:GZIPOutputStream = new GZIPOutputStream(outputStream) - if(logger.isDebugEnabled) - logger.debug("Allocating message byte buffer of size = " + MessageSet.messageSetSize(messages)) + debug("Allocating message byte buffer of size = " + MessageSet.messageSetSize(messages)) val messageByteBuffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) messages.foreach(m => m.serializeTo(messageByteBuffer)) @@ -44,7 +42,7 @@ try { gzipOutput.write(messageByteBuffer.array) } catch { - case e: IOException => logger.error("Error while writing to the GZIP output stream", e) + case e: IOException => error("Error while writing to the GZIP output stream", e) if(gzipOutput != null) gzipOutput.close(); if(outputStream != null) outputStream.close() throw e @@ -58,8 +56,7 @@ case GZIPCompressionCodec => val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream() val gzipOutput:GZIPOutputStream = new GZIPOutputStream(outputStream) - if(logger.isDebugEnabled) - logger.debug("Allocating message byte buffer of size = " + MessageSet.messageSetSize(messages)) + debug("Allocating message byte buffer of size = " + MessageSet.messageSetSize(messages)) val messageByteBuffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) messages.foreach(m => m.serializeTo(messageByteBuffer)) @@ -68,7 +65,7 @@ try { gzipOutput.write(messageByteBuffer.array) } catch { - case e: IOException => logger.error("Error while writing to the GZIP output stream", e) + case e: IOException => error("Error while writing to the GZIP output stream", e) if(gzipOutput != null) gzipOutput.close() if(outputStream != null) @@ -99,7 +96,7 @@ outputStream.write(intermediateBuffer, 0, dataRead) } }catch { - case e: IOException => logger.error("Error while reading from the GZIP input stream", e) + case e: IOException => error("Error while reading from the GZIP input stream", e) if(gzipIn != null) gzipIn.close if(outputStream != null) outputStream.close throw e @@ -124,7 +121,7 @@ outputStream.write(intermediateBuffer, 0, dataRead) } }catch { - case e: IOException => logger.error("Error while reading from the GZIP input stream", e) + case e: IOException => error("Error while reading from the GZIP input stream", e) if(gzipIn != null) gzipIn.close if(outputStream != null) outputStream.close throw e Index: core/src/main/scala/kafka/server/KafkaZooKeeper.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaZooKeeper.scala (revision 1200390) +++ core/src/main/scala/kafka/server/KafkaZooKeeper.scala (working copy) @@ -18,7 +18,6 @@ package kafka.server import kafka.utils._ -import org.apache.log4j.Logger import kafka.cluster.Broker import org.I0Itec.zkclient.{IZkStateListener, ZkClient} import org.I0Itec.zkclient.exception.ZkNodeExistsException @@ -32,10 +31,8 @@ * /brokers/[0...N] --> host:port * */ -class KafkaZooKeeper(config: KafkaConfig, logManager: LogManager) { +class KafkaZooKeeper(config: KafkaConfig, logManager: LogManager) extends LogHelper { - private val logger = Logger.getLogger(classOf[KafkaZooKeeper]) - val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId var zkClient: ZkClient = null var topics: List[String] = Nil @@ -43,13 +40,13 @@ def startup() { /* start client */ - logger.info("connecting to ZK: " + config.zkConnect) + info("connecting to ZK: " + config.zkConnect) zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) zkClient.subscribeStateChanges(new SessionExpireListener) } def registerBrokerInZk() { - logger.info("Registering broker " + brokerIdPath) + info("Registering broker " + brokerIdPath) val hostName = if (config.hostName == null) InetAddress.getLocalHost.getHostAddress else config.hostName val creatorId = hostName + "-" + System.currentTimeMillis val broker = new Broker(config.brokerId, creatorId, hostName, config.port) @@ -62,7 +59,7 @@ "else you have shutdown this broker and restarted it faster than the zookeeper " + "timeout so it appears to be re-registering.") } - logger.info("Registering broker " + brokerIdPath + " succeeded with " + broker) + info("Registering broker " + brokerIdPath + " succeeded with " + broker) } def registerTopicInZk(topic: String) { @@ -75,9 +72,9 @@ def registerTopicInZkInternal(topic: String) { val brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic + "/" + config.brokerId val numParts = logManager.getTopicPartitionsMap.getOrElse(topic, config.numPartitions) - logger.info("Begin registering broker topic " + brokerTopicPath + " with " + numParts.toString + " partitions") + info("Begin registering broker topic " + brokerTopicPath + " with " + numParts.toString + " partitions") ZkUtils.createEphemeralPathExpectConflict(zkClient, brokerTopicPath, numParts.toString) - logger.info("End registering broker topic " + brokerTopicPath) + info("End registering broker topic " + brokerTopicPath) } /** @@ -99,20 +96,20 @@ */ @throws(classOf[Exception]) def handleNewSession() { - logger.info("re-registering broker info in ZK for broker " + config.brokerId) + info("re-registering broker info in ZK for broker " + config.brokerId) registerBrokerInZk() lock synchronized { - logger.info("re-registering broker topics in ZK for broker " + config.brokerId) + info("re-registering broker topics in ZK for broker " + config.brokerId) for (topic <- topics) registerTopicInZkInternal(topic) } - logger.info("done re-registering broker") + info("done re-registering broker") } } def close() { if (zkClient != null) { - logger.info("Closing zookeeper client...") + info("Closing zookeeper client...") zkClient.close() } } Index: core/src/main/scala/kafka/server/KafkaServer.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaServer.scala (revision 1200390) +++ core/src/main/scala/kafka/server/KafkaServer.scala (working copy) @@ -18,11 +18,10 @@ package kafka.server import scala.reflect.BeanProperty -import org.apache.log4j.Logger import kafka.log.LogManager import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicBoolean -import kafka.utils.{Mx4jLoader, Utils, SystemTime, KafkaScheduler} +import kafka.utils.{Mx4jLoader, Utils, SystemTime, KafkaScheduler, LogHelper} import kafka.network.{SocketServerStats, SocketServer} import java.io.File @@ -30,11 +29,10 @@ * Represents the lifecycle of a single Kafka broker. Handles all functionality required * to start up and shutdown a single Kafka node. */ -class KafkaServer(val config: KafkaConfig) { +class KafkaServer(val config: KafkaConfig) extends LogHelper { val CLEAN_SHUTDOWN_FILE = ".kafka_cleanshutdown" private val isShuttingDown = new AtomicBoolean(false) - private val logger = Logger.getLogger(classOf[KafkaServer]) private val shutdownLatch = new CountDownLatch(1) private val statsMBeanName = "kafka:type=kafka.SocketServerStats" @@ -50,7 +48,7 @@ */ def startup() { try { - logger.info("Starting Kafka server...") + info("Starting Kafka server...") var needRecovery = true val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE) if (cleanShutDownFile.exists) { @@ -78,11 +76,11 @@ * So this should happen after socket server start. */ logManager.startup - logger.info("Server started.") + info("Server started.") } catch { case e => - logger.fatal("Fatal error during startup.", e) + fatal("Fatal error during startup.", e) shutdown } } @@ -94,7 +92,7 @@ def shutdown() { val canShutdown = isShuttingDown.compareAndSet(false, true); if (canShutdown) { - logger.info("Shutting down...") + info("Shutting down...") try { scheduler.shutdown() if (socketServer != null) @@ -108,11 +106,11 @@ } catch { case e => - logger.fatal(e) - logger.fatal(Utils.stackTrace(e)) + fatal(e) + fatal(Utils.stackTrace(e)) } shutdownLatch.countDown() - logger.info("shut down completed") + info("shut down completed") } } Index: core/src/main/scala/kafka/server/KafkaServerStartable.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaServerStartable.scala (revision 1200390) +++ core/src/main/scala/kafka/server/KafkaServerStartable.scala (working copy) @@ -17,11 +17,10 @@ package kafka.server -import kafka.utils.Utils +import kafka.utils.{Utils, LogHelper} import kafka.consumer._ import kafka.producer.{ProducerData, ProducerConfig, Producer} import kafka.message.Message -import org.apache.log4j.Logger import java.util.concurrent.CountDownLatch import scala.collection.Map @@ -62,10 +61,8 @@ class EmbeddedConsumer(private val consumerConfig: ConsumerConfig, private val producerConfig: ProducerConfig, - private val kafkaServer: KafkaServer) extends TopicEventHandler[String] { + private val kafkaServer: KafkaServer) extends TopicEventHandler[String] with LogHelper { - private val logger = Logger.getLogger(getClass) - private val whiteListTopics = consumerConfig.mirrorTopicsWhitelist.split(",").toList.map(_.trim) @@ -96,16 +93,16 @@ val addedTopics = newMirrorTopics filterNot (mirrorTopics contains) if (addedTopics.nonEmpty) - logger.info("topic event: added topics = %s".format(addedTopics)) + info("topic event: added topics = %s".format(addedTopics)) val deletedTopics = mirrorTopics filterNot (newMirrorTopics contains) if (deletedTopics.nonEmpty) - logger.info("topic event: deleted topics = %s".format(deletedTopics)) + info("topic event: deleted topics = %s".format(deletedTopics)) mirrorTopics = newMirrorTopics if (addedTopics.nonEmpty || deletedTopics.nonEmpty) { - logger.info("mirror topics = %s".format(mirrorTopics)) + info("mirror topics = %s".format(mirrorTopics)) startNewConsumerThreads(makeTopicMap(mirrorTopics)) } } @@ -142,7 +139,7 @@ threadList.foreach(_.start) } else - logger.info("Not starting mirroring threads (mirror topic list is empty)") + info("Not starting mirroring threads (mirror topic list is empty)") } def startup() { @@ -157,29 +154,28 @@ // first shutdown the topic watcher to prevent creating new consumer streams if (topicEventWatcher != null) topicEventWatcher.shutdown() - logger.info("Stopped the ZK watcher for new topics, now stopping the Kafka consumers") + info("Stopped the ZK watcher for new topics, now stopping the Kafka consumers") // stop pulling more data for mirroring if (consumerConnector != null) consumerConnector.shutdown() - logger.info("Stopped the kafka consumer threads for existing topics, now stopping the existing mirroring threads") + info("Stopped the kafka consumer threads for existing topics, now stopping the existing mirroring threads") // wait for all mirroring threads to stop threadList.foreach(_.shutdown) - logger.info("Stopped all existing mirroring threads, now stopping the producer") + info("Stopped all existing mirroring threads, now stopping the producer") // only then, shutdown the producer producer.close() - logger.info("Successfully shutdown this Kafka mirror") + info("Successfully shutdown this Kafka mirror") } - class MirroringThread(val stream: KafkaMessageStream[Message], val topic: String, val threadId: Int) extends Thread { + class MirroringThread(val stream: KafkaMessageStream[Message], val topic: String, val threadId: Int) extends Thread with LogHelper { val shutdownComplete = new CountDownLatch(1) val name = "kafka-embedded-consumer-%s-%d".format(topic, threadId) this.setDaemon(false) this.setName(name) - private val logger = Logger.getLogger(name) override def run = { - logger.info("Starting mirroring thread %s for topic %s and stream %d".format(name, topic, threadId)) + info("Starting mirroring thread %s for topic %s and stream %d".format(name, topic, threadId)) try { for (message <- stream) { @@ -189,11 +185,11 @@ } catch { case e => - logger.fatal(e + Utils.stackTrace(e)) - logger.fatal(topic + " stream " + threadId + " unexpectedly exited") + fatal(e + Utils.stackTrace(e)) + fatal(topic + " stream " + threadId + " unexpectedly exited") }finally { shutdownComplete.countDown - logger.info("Stopped mirroring thread %s for topic %s and stream %d".format(name, topic, threadId)) + info("Stopped mirroring thread %s for topic %s and stream %d".format(name, topic, threadId)) } } @@ -201,7 +197,7 @@ try { shutdownComplete.await }catch { - case e: InterruptedException => logger.fatal("Shutdown of thread " + name + " interrupted. " + + case e: InterruptedException => fatal("Shutdown of thread " + name + " interrupted. " + "Mirroring thread might leak data!") } } Index: core/src/main/scala/kafka/server/KafkaRequestHandlers.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (revision 1200390) +++ core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (working copy) @@ -18,7 +18,6 @@ package kafka.server import java.nio.channels._ -import org.apache.log4j.Logger import kafka.producer._ import kafka.consumer._ import kafka.log._ @@ -27,15 +26,15 @@ import kafka.server._ import kafka.api._ import kafka.common.ErrorMapping -import kafka.utils.{Utils, SystemTime} +import kafka.utils.{Utils, SystemTime, LogHelper} import java.io.IOException +import org.apache.log4j.Logger /** * Logic to handle the various Kafka requests */ -private[kafka] class KafkaRequestHandlers(val logManager: LogManager) { +private[kafka] class KafkaRequestHandlers(val logManager: LogManager) extends LogHelper { - private val logger = Logger.getLogger(classOf[KafkaRequestHandlers]) private val requestLogger = Logger.getLogger("kafka.request.logger") def handlerFor(requestTypeId: Short, request: Receive): Handler.Handler = { @@ -56,8 +55,7 @@ if(requestLogger.isTraceEnabled) requestLogger.trace("Producer request " + request.toString) handleProducerRequest(request, "ProduceRequest") - if (logger.isDebugEnabled) - logger.debug("kafka produce time " + (SystemTime.milliseconds - sTime) + " ms") + debug("kafka produce time " + (SystemTime.milliseconds - sTime) + " ms") None } @@ -73,15 +71,14 @@ val partition = request.getTranslatedPartition(logManager.chooseRandomPartition) try { logManager.getOrCreateLog(request.topic, partition).append(request.messages) - if(logger.isTraceEnabled) - logger.trace(request.messages.sizeInBytes + " bytes written to logs.") + trace(request.messages.sizeInBytes + " bytes written to logs.") } catch { case e => - logger.error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e) + error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e) e match { case _: IOException => - logger.fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e) + fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e) Runtime.getRuntime.halt(1) case _ => } @@ -111,13 +108,13 @@ private def readMessageSet(fetchRequest: FetchRequest): MessageSetSend = { var response: MessageSetSend = null try { - logger.trace("Fetching log segment for topic = " + fetchRequest.topic + " and partition = " + fetchRequest.partition) + trace("Fetching log segment for topic = " + fetchRequest.topic + " and partition = " + fetchRequest.partition) val log = logManager.getOrCreateLog(fetchRequest.topic, fetchRequest.partition) response = new MessageSetSend(log.read(fetchRequest.offset, fetchRequest.maxSize)) } catch { case e => - logger.error("error when processing request " + fetchRequest, e) + error("error when processing request " + fetchRequest, e) response=new MessageSetSend(MessageSet.Empty, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } response Index: core/src/main/scala/kafka/network/SocketServer.scala =================================================================== --- core/src/main/scala/kafka/network/SocketServer.scala (revision 1200390) +++ core/src/main/scala/kafka/network/SocketServer.scala (working copy) @@ -40,7 +40,6 @@ private val handlerFactory: Handler.HandlerMapping, val maxRequestSize: Int = Int.MaxValue) { - private val logger = Logger.getLogger(classOf[SocketServer]) private val time = SystemTime private val processors = new Array[Processor](numProcessorThreads) private var acceptor: Acceptor = new Acceptor(port, processors) Index: core/src/main/scala/kafka/network/Transmission.scala =================================================================== --- core/src/main/scala/kafka/network/Transmission.scala (revision 1200390) +++ core/src/main/scala/kafka/network/Transmission.scala (working copy) @@ -19,15 +19,13 @@ import java.nio._ import java.nio.channels._ -import org.apache.log4j.Logger +import kafka.utils.LogHelper /** * Represents a stateful transfer of data to or from the network */ -private[network] trait Transmission { +private[network] trait Transmission extends LogHelper { - protected val logger: Logger = Logger.getLogger(getClass()) - def complete: Boolean protected def expectIncomplete(): Unit = { @@ -55,8 +53,7 @@ var read = 0 while(!complete) { read = readFrom(channel) - if(logger.isTraceEnabled) - logger.trace(read + " bytes read.") + trace(read + " bytes read.") } read } @@ -74,8 +71,7 @@ var written = 0 while(!complete) { written = writeTo(channel) - if(logger.isTraceEnabled) - logger.trace(written + " bytes written.") + trace(written + " bytes written.") } written } @@ -102,7 +98,7 @@ def complete: Boolean = { if (current == Nil) { if (totalWritten != expectedBytesToWrite) - logger.error("mismatch in sending bytes over socket; expected: " + expectedBytesToWrite + " actual: " + totalWritten) + error("mismatch in sending bytes over socket; expected: " + expectedBytesToWrite + " actual: " + totalWritten) return true } else Index: core/src/main/scala/kafka/consumer/KafkaMessageStream.scala =================================================================== --- core/src/main/scala/kafka/consumer/KafkaMessageStream.scala (revision 1200390) +++ core/src/main/scala/kafka/consumer/KafkaMessageStream.scala (working copy) @@ -18,7 +18,6 @@ package kafka.consumer import java.util.concurrent.BlockingQueue -import org.apache.log4j.Logger import kafka.message.Message import kafka.serializer.{DefaultDecoder, Decoder} @@ -32,7 +31,6 @@ private val decoder: Decoder[T]) extends Iterable[T] with java.lang.Iterable[T]{ - private val logger = Logger.getLogger(getClass()) private val iter: ConsumerIterator[T] = new ConsumerIterator[T](topic, queue, consumerTimeoutMs, decoder) Index: core/src/main/scala/kafka/consumer/SimpleConsumer.scala =================================================================== --- core/src/main/scala/kafka/consumer/SimpleConsumer.scala (revision 1200390) +++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala (working copy) @@ -21,7 +21,6 @@ import java.nio._ import java.nio.channels._ import java.util.concurrent.atomic._ -import org.apache.log4j.Logger import kafka.api._ import kafka.common._ import kafka.message._ @@ -35,8 +34,7 @@ class SimpleConsumer(val host: String, val port: Int, val soTimeout: Int, - val bufferSize: Int) { - private val logger = Logger.getLogger(getClass()) + val bufferSize: Int) extends LogHelper { private var channel : SocketChannel = null private val lock = new Object() @@ -45,23 +43,20 @@ val address = new InetSocketAddress(host, port) val channel = SocketChannel.open - if(logger.isDebugEnabled) - logger.debug("Connected to " + address + " for fetching.") + debug("Connected to " + address + " for fetching.") channel.configureBlocking(true) channel.socket.setReceiveBufferSize(bufferSize) channel.socket.setSoTimeout(soTimeout) channel.socket.setKeepAlive(true) channel.connect(address) - if(logger.isTraceEnabled) { - logger.trace("requested receive buffer size=" + bufferSize + " actual receive buffer size= " + channel.socket.getReceiveBufferSize) - logger.trace("soTimeout=" + soTimeout + " actual soTimeout= " + channel.socket.getSoTimeout) - } + trace("requested receive buffer size=" + bufferSize + " actual receive buffer size= " + channel.socket.getReceiveBufferSize) + trace("soTimeout=" + soTimeout + " actual soTimeout= " + channel.socket.getSoTimeout) + channel } private def close(channel: SocketChannel) = { - if(logger.isDebugEnabled) - logger.debug("Disconnecting from " + channel.socket.getRemoteSocketAddress()) + debug("Disconnecting from " + channel.socket.getRemoteSocketAddress()) Utils.swallow(logger.warn, channel.close()) Utils.swallow(logger.warn, channel.socket.close()) } @@ -90,7 +85,7 @@ response = getResponse } catch { case e : java.io.IOException => - logger.info("fetch reconnect due to " + e) + info("fetch reconnect due to " + e) // retry once try { channel = connect @@ -124,7 +119,7 @@ response = getResponse } catch { case e : java.io.IOException => - logger.info("multifetch reconnect due to " + e) + info("multifetch reconnect due to " + e) // retry once try { channel = connect @@ -160,7 +155,7 @@ response = getResponse } catch { case e : java.io.IOException => - logger.info("getOffsetsBefore reconnect due to " + e) + info("getOffsetsBefore reconnect due to " + e) // retry once try { channel = connect @@ -222,8 +217,7 @@ def getConsumerThroughput: Double = fetchRequestStats.getThroughput } -object SimpleConsumerStats { - private val logger = Logger.getLogger(getClass()) +object SimpleConsumerStats extends LogHelper { private val simpleConsumerstatsMBeanName = "kafka:type=kafka.SimpleConsumerStats" private val stats = new SimpleConsumerStats Utils.swallow(logger.warn, Utils.registerMBean(stats, simpleConsumerstatsMBeanName)) Index: core/src/main/scala/kafka/consumer/ConsumerIterator.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerIterator.scala (revision 1200390) +++ core/src/main/scala/kafka/consumer/ConsumerIterator.scala (working copy) @@ -17,8 +17,7 @@ package kafka.consumer -import kafka.utils.IteratorTemplate -import org.apache.log4j.Logger +import kafka.utils.{IteratorTemplate, LogHelper} import java.util.concurrent.{TimeUnit, BlockingQueue} import kafka.cluster.Partition import kafka.message.{MessageAndOffset, MessageSet, Message} @@ -33,9 +32,8 @@ private val channel: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int, private val decoder: Decoder[T]) - extends IteratorTemplate[T] { + extends IteratorTemplate[T] with LogHelper { - private val logger = Logger.getLogger(classOf[ConsumerIterator[T]]) private var current: Iterator[MessageAndOffset] = null private var currentDataChunk: FetchedDataChunk = null private var currentTopicInfo: PartitionTopicInfo = null @@ -46,8 +44,7 @@ if(consumedOffset < 0) throw new IllegalStateException("Offset returned by the message set is invalid %d".format(consumedOffset)) currentTopicInfo.resetConsumeOffset(consumedOffset) - if(logger.isTraceEnabled) - logger.trace("Setting consumed offset to %d".format(consumedOffset)) + trace("Setting consumed offset to %d".format(consumedOffset)) ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1) decodedMessage } @@ -64,14 +61,13 @@ } } if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) { - if(logger.isDebugEnabled) - logger.debug("Received the shutdown command") + debug("Received the shutdown command") channel.offer(currentDataChunk) return allDone } else { currentTopicInfo = currentDataChunk.topicInfo if (currentTopicInfo.getConsumeOffset != currentDataChunk.fetchOffset) { - logger.error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data" + error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data" .format(currentTopicInfo.getConsumeOffset, currentDataChunk.fetchOffset, currentTopicInfo)) currentTopicInfo.resetConsumeOffset(currentDataChunk.fetchOffset) } Index: core/src/main/scala/kafka/consumer/ConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerConnector.scala (revision 1200390) +++ core/src/main/scala/kafka/consumer/ConsumerConnector.scala (working copy) @@ -18,8 +18,7 @@ package kafka.consumer import scala.collection._ -import kafka.utils.Utils -import org.apache.log4j.Logger +import kafka.utils.{Utils, LogHelper} import kafka.serializer.{DefaultDecoder, Decoder} /** @@ -48,8 +47,7 @@ def shutdown() } -object Consumer { - private val logger = Logger.getLogger(getClass()) +object Consumer extends LogHelper { private val consumerStatsMBeanName = "kafka:type=kafka.ConsumerStats" /** Index: core/src/main/scala/kafka/consumer/FetcherRunnable.scala =================================================================== --- core/src/main/scala/kafka/consumer/FetcherRunnable.scala (revision 1200390) +++ core/src/main/scala/kafka/consumer/FetcherRunnable.scala (working copy) @@ -18,7 +18,6 @@ package kafka.consumer import java.util.concurrent.CountDownLatch -import org.apache.log4j.Logger import java.nio.channels.{ClosedChannelException, ClosedByInterruptException} import kafka.common.{OffsetOutOfRangeException, ErrorMapping} import kafka.cluster.{Partition, Broker} @@ -32,8 +31,7 @@ val config: ConsumerConfig, val broker: Broker, val partitionTopicInfos: List[PartitionTopicInfo]) - extends Thread(name) { - private val logger = Logger.getLogger(getClass()) + extends Thread(name) with LogHelper { private val shutdownLatch = new CountDownLatch(1) private val simpleConsumer = new SimpleConsumer(broker.host, broker.port, config.socketTimeoutMs, config.socketBufferSize) @@ -43,43 +41,42 @@ def shutdown(): Unit = { stopped = true interrupt - logger.debug("awaiting shutdown on fetcher " + name) + debug("awaiting shutdown on fetcher " + name) shutdownLatch.await - logger.debug("shutdown of fetcher " + name + " thread complete") + debug("shutdown of fetcher " + name + " thread complete") } override def run() { - for (info <- partitionTopicInfos) - logger.info(name + " start fetching topic: " + info.topic + " part: " + info.partition.partId + " offset: " - + info.getFetchOffset + " from " + broker.host + ":" + broker.port) + for (infopti <- partitionTopicInfos) + info(name + " start fetching topic: " + infopti.topic + " part: " + infopti.partition.partId + " offset: " + + infopti.getFetchOffset + " from " + broker.host + ":" + broker.port) try { while (!stopped) { val fetches = partitionTopicInfos.map(info => new FetchRequest(info.topic, info.partition.partId, info.getFetchOffset, config.fetchSize)) - if (logger.isTraceEnabled) - logger.trace("fetch request: " + fetches.toString) + trace("fetch request: " + fetches.toString) val response = simpleConsumer.multifetch(fetches : _*) var read = 0L - for((messages, info) <- response.zip(partitionTopicInfos)) { + for((messages, infopti) <- response.zip(partitionTopicInfos)) { try { var done = false if(messages.getErrorCode == ErrorMapping.OffsetOutOfRangeCode) { - logger.info("offset for " + info + " out of range") + info("offset for " + infopti + " out of range") // see if we can fix this error - val resetOffset = resetConsumerOffsets(info.topic, info.partition) + val resetOffset = resetConsumerOffsets(infopti.topic, infopti.partition) if(resetOffset >= 0) { - info.resetFetchOffset(resetOffset) - info.resetConsumeOffset(resetOffset) + infopti.resetFetchOffset(resetOffset) + infopti.resetConsumeOffset(resetOffset) done = true } } if (!done) - read += info.enqueue(messages, info.getFetchOffset) + read += infopti.enqueue(messages, infopti.getFetchOffset) } catch { case e1: IOException => @@ -88,18 +85,17 @@ case e2 => if (!stopped) { // this is likely a repeatable error, log it and trigger an exception in the consumer - logger.error("error in FetcherRunnable for " + info, e2) - info.enqueueError(e2, info.getFetchOffset) + error("error in FetcherRunnable for " + infopti, e2) + infopti.enqueueError(e2, infopti.getFetchOffset) } // re-throw the exception to stop the fetcher throw e2 } } - if (logger.isTraceEnabled) - logger.trace("fetched bytes: " + read) + trace("fetched bytes: " + read) if(read == 0) { - logger.debug("backing off " + config.backoffIncrementMs + " ms") + debug("backing off " + config.backoffIncrementMs + " ms") Thread.sleep(config.backoffIncrementMs) } } @@ -107,12 +103,12 @@ catch { case e => if (stopped) - logger.info("FecherRunnable " + this + " interrupted") + info("FecherRunnable " + this + " interrupted") else - logger.error("error in FetcherRunnable ", e) + error("error in FetcherRunnable ", e) } - logger.info("stopping fetcher " + name + " to host " + broker.host) + info("stopping fetcher " + name + " to host " + broker.host) Utils.swallow(logger.info, simpleConsumer.close) shutdownComplete() } @@ -136,7 +132,7 @@ val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) // reset manually in zookeeper - logger.info("updating partition " + partition.name + " for topic " + topic + " with " + + info("updating partition " + partition.name + " for topic " + topic + " with " + (if(offset == OffsetRequest.EarliestTime) "earliest " else " latest ") + "offset " + offsets(0)) ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition.name, offsets(0).toString) Index: core/src/main/scala/kafka/consumer/ConsoleConsumer.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (revision 1200390) +++ core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (working copy) @@ -21,13 +21,12 @@ import scala.collection.JavaConversions._ import org.I0Itec.zkclient._ import joptsimple._ -import org.apache.log4j.Logger import java.util.Arrays.asList import java.util.Properties import java.util.Random import java.io.PrintStream import kafka.message._ -import kafka.utils.Utils +import kafka.utils.{Utils, LogHelper} import kafka.utils.ZkUtils import kafka.utils.ZKStringSerializer @@ -35,9 +34,7 @@ * Consumer that dumps messages out to standard out. * */ -object ConsoleConsumer { - - private val logger = Logger.getLogger(getClass()) +object ConsoleConsumer extends LogHelper { def main(args: Array[String]) { val parser = new OptionParser @@ -136,7 +133,7 @@ } catch { case e => if (skipMessageOnError) - logger.error("Error processing message, skipping this message: ", e) + error("Error processing message, skipping this message: ", e) else throw e } @@ -149,7 +146,7 @@ } } } catch { - case e => logger.error("Error processing message, stopping consumer: ", e) + case e => error("Error processing message, stopping consumer: ", e) } System.out.flush() @@ -171,7 +168,7 @@ def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) { for(arg <- required) { if(!options.has(arg)) { - logger.error("Missing required argument \"" + arg + "\"") + error("Missing required argument \"" + arg + "\"") parser.printHelpOn(System.err) System.exit(1) } @@ -207,7 +204,7 @@ def tryCleanupZookeeper(zkUrl: String, groupId: String) { try { val dir = "/consumers/" + groupId - logger.info("Cleaning up temporary zookeeper data under " + dir + ".") + info("Cleaning up temporary zookeeper data under " + dir + ".") val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) zk.deleteRecursive(dir) zk.close() Index: core/src/main/scala/kafka/consumer/Fetcher.scala =================================================================== --- core/src/main/scala/kafka/consumer/Fetcher.scala (revision 1200390) +++ core/src/main/scala/kafka/consumer/Fetcher.scala (working copy) @@ -18,7 +18,6 @@ package kafka.consumer import scala.collection._ -import org.apache.log4j.Logger import kafka.cluster._ import org.I0Itec.zkclient.ZkClient import java.util.concurrent.BlockingQueue @@ -27,7 +26,6 @@ * The fetcher is a background thread that fetches data from a set of servers */ private[consumer] class Fetcher(val config: ConsumerConfig, val zkClient : ZkClient) { - private val logger = Logger.getLogger(getClass()) private val EMPTY_FETCHER_THREADS = new Array[FetcherRunnable](0) @volatile private var fetcherThreads : Array[FetcherRunnable] = EMPTY_FETCHER_THREADS Index: core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala (revision 1200390) +++ core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala (working copy) @@ -17,17 +17,14 @@ package kafka.consumer -import org.apache.log4j.Logger import scala.collection.JavaConversions._ -import kafka.utils.{Utils, ZkUtils, ZKStringSerializer} +import kafka.utils.{Utils, ZkUtils, ZKStringSerializer, LogHelper} import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} import org.apache.zookeeper.Watcher.Event.KeeperState class ZookeeperTopicEventWatcher(val config:ConsumerConfig, - val eventHandler: TopicEventHandler[String]) { + val eventHandler: TopicEventHandler[String]) extends LogHelper { - private val logger = Logger.getLogger(getClass) - val lock = new Object() private var zkClient: ZkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, @@ -60,12 +57,12 @@ zkClient = null } else - logger.warn("Cannot shutdown already shutdown topic event watcher.") + warn("Cannot shutdown already shutdown topic event watcher.") } catch { case e => - logger.fatal(e) - logger.fatal(Utils.stackTrace(e)) + fatal(e) + fatal(Utils.stackTrace(e)) } } } @@ -78,15 +75,15 @@ try { if (zkClient != null) { val latestTopics = zkClient.getChildren(ZkUtils.BrokerTopicsPath).toList - logger.debug("all topics: %s".format(latestTopics)) + debug("all topics: %s".format(latestTopics)) eventHandler.handleTopicEvent(latestTopics) } } catch { case e => - logger.fatal(e) - logger.fatal(Utils.stackTrace(e)) + fatal(e) + fatal(Utils.stackTrace(e)) } } } @@ -103,7 +100,7 @@ def handleNewSession() { lock.synchronized { if (zkClient != null) { - logger.info( + info( "ZK expired: resubscribing topic event listener to topic registry") zkClient.subscribeChildChanges( ZkUtils.BrokerTopicsPath, topicEventListener) Index: core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala =================================================================== --- core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala (revision 1200390) +++ core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala (working copy) @@ -18,7 +18,6 @@ package kafka.consumer.storage.sql import java.sql._ -import org.apache.log4j._ import kafka.utils._ import kafka.consumer.storage.OffsetStorage @@ -26,9 +25,8 @@ * An offset storage implementation that uses an oracle database to save offsets */ @nonthreadsafe -class OracleOffsetStorage(val connection: Connection) extends OffsetStorage { +class OracleOffsetStorage(val connection: Connection) extends OffsetStorage with LogHelper { - private val logger: Logger = Logger.getLogger(classOf[OracleOffsetStorage]) private val lock = new Object connection.setAutoCommit(false) @@ -43,8 +41,7 @@ } } - if(logger.isDebugEnabled) - logger.debug("Reserved node " + node + " for topic '" + topic + " offset " + offset) + debug("Reserved node " + node + " for topic '" + topic + " offset " + offset) offset } Index: core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala =================================================================== --- core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (revision 1200390) +++ core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (working copy) @@ -22,8 +22,8 @@ import java.util.concurrent.atomic._ import kafka.message._ import kafka.cluster._ +import kafka.utils.LogHelper import kafka.common.ErrorMapping -import org.apache.log4j.Logger private[consumer] class PartitionTopicInfo(val topic: String, val brokerId: Int, @@ -31,27 +31,23 @@ private val chunkQueue: BlockingQueue[FetchedDataChunk], private val consumedOffset: AtomicLong, private val fetchedOffset: AtomicLong, - private val fetchSize: AtomicInteger) { - private val logger = Logger.getLogger(getClass()) - if (logger.isDebugEnabled) { - logger.debug("initial consumer offset of " + this + " is " + consumedOffset.get) - logger.debug("initial fetch offset of " + this + " is " + fetchedOffset.get) - } + private val fetchSize: AtomicInteger) extends LogHelper { + debug("initial consumer offset of " + this + " is " + consumedOffset.get) + debug("initial fetch offset of " + this + " is " + fetchedOffset.get) + def getConsumeOffset() = consumedOffset.get def getFetchOffset() = fetchedOffset.get def resetConsumeOffset(newConsumeOffset: Long) = { consumedOffset.set(newConsumeOffset) - if (logger.isDebugEnabled) - logger.debug("reset consume offset of " + this + " to " + newConsumeOffset) + debug("reset consume offset of " + this + " to " + newConsumeOffset) } def resetFetchOffset(newFetchOffset: Long) = { fetchedOffset.set(newFetchOffset) - if (logger.isDebugEnabled) - logger.debug("reset fetch offset of ( %s ) to %d".format(this, newFetchOffset)) + debug("reset fetch offset of ( %s ) to %d".format(this, newFetchOffset)) } /** @@ -62,12 +58,10 @@ val size = messages.validBytes if(size > 0) { // update fetched offset to the compressed data chunk size, not the decompressed message set size - if(logger.isTraceEnabled) - logger.trace("Updating fetch offset = " + fetchedOffset.get + " with size = " + size) + trace("Updating fetch offset = " + fetchedOffset.get + " with size = " + size) chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset)) val newOffset = fetchedOffset.addAndGet(size) - if (logger.isDebugEnabled) - logger.debug("updated fetch offset of ( %s ) to %d".format(this, newOffset)) + debug("updated fetch offset of ( %s ) to %d".format(this, newOffset)) } size } Index: core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala (revision 1200390) +++ core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala (working copy) @@ -18,8 +18,7 @@ package kafka.consumer import java.util.concurrent.atomic.AtomicLong -import org.apache.log4j.Logger -import kafka.utils.{Pool, Utils, threadsafe} +import kafka.utils.{Pool, Utils, threadsafe, LogHelper} trait ConsumerTopicStatMBean { def getMessagesPerTopic: Long @@ -34,8 +33,7 @@ def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages) } -object ConsumerTopicStat { - private val logger = Logger.getLogger(getClass()) +object ConsumerTopicStat extends LogHelper { private val stats = new Pool[String, ConsumerTopicStat] def getConsumerTopicStat(topic: String): ConsumerTopicStat = { Index: core/src/main/scala/kafka/consumer/TopicCount.scala =================================================================== --- core/src/main/scala/kafka/consumer/TopicCount.scala (revision 1200390) +++ core/src/main/scala/kafka/consumer/TopicCount.scala (working copy) @@ -19,10 +19,9 @@ import scala.collection._ import scala.util.parsing.json.JSON -import org.apache.log4j.Logger +import kafka.utils.LogHelper -private[consumer] object TopicCount { - private val logger = Logger.getLogger(getClass()) +private[consumer] object TopicCount extends LogHelper { val myConversionFunc = {input : String => input.toInt} JSON.globalNumberParser = myConversionFunc @@ -36,7 +35,7 @@ } catch { case e => - logger.error("error parsing consumer json string " + jsonString, e) + error("error parsing consumer json string " + jsonString, e) throw e } Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 1200390) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -20,7 +20,6 @@ import java.util.concurrent._ import java.util.concurrent.atomic._ import scala.collection._ -import org.apache.log4j.Logger import kafka.cluster._ import kafka.utils._ import org.I0Itec.zkclient.exception.ZkNodeExistsException @@ -85,9 +84,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val enableFetcher: Boolean) // for testing only - extends ConsumerConnector with ZookeeperConsumerConnectorMBean { + extends ConsumerConnector with ZookeeperConsumerConnectorMBean with LogHelper { - private val logger = Logger.getLogger(getClass()) private val isShuttingDown = new AtomicBoolean(false) private val rebalanceLock = new Object private var fetcher: Option[Fetcher] = None @@ -99,7 +97,7 @@ connectZk() createFetcher() if (config.autoCommit) { - logger.info("starting auto committer every " + config.autoCommitIntervalMs + " ms") + info("starting auto committer every " + config.autoCommitIntervalMs + " ms") scheduler.scheduleWithRate(autoCommit, config.autoCommitIntervalMs, config.autoCommitIntervalMs) } @@ -117,14 +115,14 @@ } private def connectZk() { - logger.info("Connecting to zookeeper instance at " + config.zkConnect) + info("Connecting to zookeeper instance at " + config.zkConnect) zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) } def shutdown() { val canShutdown = isShuttingDown.compareAndSet(false, true); if (canShutdown) { - logger.info("ZKConsumerConnector shutting down") + info("ZKConsumerConnector shutting down") try { scheduler.shutdownNow() fetcher match { @@ -141,16 +139,16 @@ } catch { case e => - logger.fatal("error during consumer connector shutdown", e) + fatal("error during consumer connector shutdown", e) } - logger.info("ZKConsumerConnector shut down completed") + info("ZKConsumerConnector shut down completed") } } def consume[T](topicCountMap: scala.collection.Map[String,Int], decoder: Decoder[T]) : Map[String,List[KafkaMessageStream[T]]] = { - logger.debug("entering consume ") + debug("entering consume ") if (topicCountMap == null) throw new RuntimeException("topicCountMap is null") @@ -190,7 +188,7 @@ streamList ::= new KafkaMessageStream[T](topic, stream, config.consumerTimeoutMs, decoder) } ret += (topic -> streamList) - logger.debug("adding topic " + topic + " and stream to map..") + debug("adding topic " + topic + " and stream to map..") // register on broker partition path changes val partitionPath = ZkUtils.BrokerTopicsPath + "/" + topic @@ -204,30 +202,29 @@ } private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = { - logger.info("begin registering consumer " + consumerIdString + " in ZK") + info("begin registering consumer " + consumerIdString + " in ZK") ZkUtils.createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, topicCount.toJsonString) - logger.info("end registering consumer " + consumerIdString + " in ZK") + info("end registering consumer " + consumerIdString + " in ZK") } private def sendShudownToAllQueues() = { for (queue <- queues.values) { - logger.debug("Clearing up queue") + debug("Clearing up queue") queue.clear() queue.put(ZookeeperConsumerConnector.shutdownCommand) - logger.debug("Cleared queue and sent shutdown command") + debug("Cleared queue and sent shutdown command") } } def autoCommit() { - if(logger.isTraceEnabled) - logger.trace("auto committing") + trace("auto committing") try { commitOffsets() } catch { case t: Throwable => // log it and let it go - logger.error("exception during autoCommit: ", t) + error("exception during autoCommit: ", t) } } @@ -245,10 +242,9 @@ catch { case t: Throwable => // log it and let it go - logger.warn("exception during commitOffsets", t) + warn("exception during commitOffsets", t) } - if(logger.isDebugEnabled) - logger.debug("Committed offset " + newOffset + " for topic " + info) + debug("Committed offset " + newOffset + " for topic " + info) } } } @@ -298,7 +294,7 @@ } catch { case e => - logger.error("error in getConsumedOffset JMX ", e) + error("error in getConsumedOffset JMX ", e) } return -2 } @@ -319,7 +315,7 @@ } catch { case e => - logger.error("error in earliestOrLatestOffset() ", e) + error("error in earliestOrLatestOffset() ", e) } finally { if (simpleConsumer != null) @@ -352,7 +348,7 @@ * connection for us. We need to release the ownership of the current consumer and re-register this * consumer in the consumer registry and trigger a rebalance. */ - logger.info("ZK expired; release old broker parition ownership; re-register consumer " + consumerIdString) + info("ZK expired; release old broker parition ownership; re-register consumer " + consumerIdString) loadBalancerListener.resetState registerConsumerInZK(dirs, consumerIdString, topicCount) // explicitly trigger load balancing for this consumer @@ -381,8 +377,7 @@ for(partition <- infos.keys) { val znode = topicDirs.consumerOwnerDir + "/" + partition ZkUtils.deletePath(zkClient, znode) - if(logger.isDebugEnabled) - logger.debug("Consumer " + consumerIdString + " releasing " + znode) + debug("Consumer " + consumerIdString + " releasing " + znode) } } } @@ -431,7 +426,7 @@ def syncedRebalance() { rebalanceLock synchronized { for (i <- 0 until ZookeeperConsumerConnector.MAX_N_RETRIES) { - logger.info("begin rebalancing consumer " + consumerIdString + " try #" + i) + info("begin rebalancing consumer " + consumerIdString + " try #" + i) var done = false try { done = rebalance() @@ -441,9 +436,9 @@ // occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. // For example, a ZK node can disappear between the time we get all children and the time we try to get // the value of a child. Just let this go since another rebalance will be triggered. - logger.info("exception during rebalance ", e) + info("exception during rebalance ", e) } - logger.info("end rebalancing consumer " + consumerIdString + " try #" + i) + info("end rebalancing consumer " + consumerIdString + " try #" + i) if (done) return // release all partitions, reset state and retry @@ -463,14 +458,14 @@ val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator) val relevantTopicThreadIdsMap = getRelevantTopicMap(myTopicThreadIdsMap, partitionsPerTopicMap, oldPartitionsPerTopicMap, consumersPerTopicMap, oldConsumersPerTopicMap) if (relevantTopicThreadIdsMap.size <= 0) { - logger.info("Consumer " + consumerIdString + " with " + consumersPerTopicMap + " doesn't need to rebalance.") + info("Consumer " + consumerIdString + " with " + consumersPerTopicMap + " doesn't need to rebalance.") return true } - logger.info("Committing all offsets") + info("Committing all offsets") commitOffsets - logger.info("Releasing partition ownership") + info("Releasing partition ownership") releasePartitionOwnership() val queuesToBeCleared = new mutable.HashSet[BlockingQueue[FetchedDataChunk]] @@ -485,7 +480,7 @@ val nPartsPerConsumer = curPartitions.size / curConsumers.size val nConsumersWithExtraPart = curPartitions.size % curConsumers.size - logger.info("Consumer " + consumerIdString + " rebalancing the following partitions: " + curPartitions + + info("Consumer " + consumerIdString + " rebalancing the following partitions: " + curPartitions + " for topic " + topic + " with consumers: " + curConsumers) for (consumerThreadId <- consumerThreadIdSet) { @@ -499,11 +494,11 @@ * The first few consumers pick up an extra partition, if any. */ if (nParts <= 0) - logger.warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic) + warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic) else { for (i <- startPart until startPart + nParts) { val partition = curPartitions(i) - logger.info(consumerThreadId + " attempting to claim partition " + partition) + info(consumerThreadId + " attempting to claim partition " + partition) if (!processPartition(topicDirs, partition, topic, consumerThreadId)) return false } @@ -523,7 +518,7 @@ for (partitionInfos <- topicRegistry.values) for (partition <- partitionInfos.values) allPartitionInfos ::= partition - logger.info("Consumer " + consumerIdString + " selected partitions : " + + info("Consumer " + consumerIdString + " selected partitions : " + allPartitionInfos.sortWith((s,t) => s.partition < t.partition).map(_.toString).mkString(",")) fetcher match { @@ -541,7 +536,7 @@ catch { case e: ZkNodeExistsException => // The node hasn't been deleted by the original owner. So wait a bit and retry. - logger.info("waiting for the partition ownership to be deleted: " + partition) + info("waiting for the partition ownership to be deleted: " + partition) return false case e2 => throw e2 } @@ -580,8 +575,7 @@ fetchedOffset, new AtomicInteger(config.fetchSize)) partTopicInfoMap.put(partition, partTopicInfo) - if (logger.isDebugEnabled) - logger.debug(partTopicInfo + " selected new offset " + offset) + debug(partTopicInfo + " selected new offset " + offset) } } } Index: core/src/main/scala/kafka/javaapi/Implicits.scala =================================================================== --- core/src/main/scala/kafka/javaapi/Implicits.scala (revision 1200390) +++ core/src/main/scala/kafka/javaapi/Implicits.scala (working copy) @@ -17,14 +17,12 @@ package kafka.javaapi import java.nio.ByteBuffer -import org.apache.log4j.Logger import kafka.serializer.Encoder import kafka.producer.{ProducerConfig, ProducerPool} import kafka.producer.async.{AsyncProducerConfig, QueueItem} +import kafka.utils.LogHelper -private[javaapi] object Implicits { - private val logger = Logger.getLogger(getClass()) - +private[javaapi] object Implicits extends LogHelper { implicit def javaMessageSetToScalaMessageSet(messageSet: kafka.javaapi.message.ByteBufferMessageSet): kafka.message.ByteBufferMessageSet = messageSet.underlying @@ -35,14 +33,12 @@ } implicit def toJavaSyncProducer(producer: kafka.producer.SyncProducer): kafka.javaapi.producer.SyncProducer = { - if(logger.isDebugEnabled) - logger.debug("Implicit instantiation of Java Sync Producer") + debug("Implicit instantiation of Java Sync Producer") new kafka.javaapi.producer.SyncProducer(producer) } implicit def toSyncProducer(producer: kafka.javaapi.producer.SyncProducer): kafka.producer.SyncProducer = { - if(logger.isDebugEnabled) - logger.debug("Implicit instantiation of Sync Producer") + debug("Implicit instantiation of Sync Producer") producer.underlying } Index: core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala =================================================================== --- core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala (revision 1200390) +++ core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala (working copy) @@ -18,13 +18,11 @@ import java.nio.ByteBuffer import kafka.common.ErrorMapping -import org.apache.log4j.Logger import kafka.message._ class ByteBufferMessageSet(private val buffer: ByteBuffer, private val initialOffset: Long = 0L, private val errorCode: Int = ErrorMapping.NoError) extends MessageSet { - private val logger = Logger.getLogger(getClass()) val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer, initialOffset, errorCode)