From 83f14045d5b7ce8cb7719dcd24af108560fba065 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Thu, 14 Feb 2013 09:10:39 -0500 Subject: [PATCH] KAFKA-524 Defer creating Producer in log4j appender Previously, there was a possibility of a deadlock when using slf4j and newer versions of ZooKeeper. The ZooKeeper client (which also uses slf4j) attempts to write out some logs which will cause an appender to get initialized, but slf4j is still busy initializing the KafkaLog4jAppender - so a deadlock occurs. This fix defers the initialization of the Kafka Producer in the appender until the first log message is produced. This has the downside of some overhead when you first produce a log message, but has the benefit of not deadlocking. Also adding the zk session/connect timeout options (which really should always be included when using ZK) --- .../scala/kafka/producer/KafkaLog4jAppender.scala | 41 ++++++++++++++++--- .../unit/kafka/log4j/KafkaLog4jAppenderTest.scala | 1 + 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala index 417da27..315f3ce 100644 --- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala +++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala @@ -31,15 +31,26 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { var topic:String = null var serializerClass:String = null var zkConnect:String = null + var zkConnectTimeout:String = null + var zkSessionTimeout:String = null var brokerList:String = null private var producer: Producer[String, String] = null + private var config : ProducerConfig = null + private val lock : AnyRef = new Object() + @volatile private var initialized:Boolean = false def getTopic:String = topic def setTopic(topic: String) { this.topic = topic } def getZkConnect:String = zkConnect def setZkConnect(zkConnect: String) { this.zkConnect = zkConnect } + + def getZkConnectTimeout:String = zkConnectTimeout + def setZkConnectTimeout(zkConnectTimeout: String) { this.zkConnectTimeout = zkConnectTimeout } + + def getZkSessionTimeout:String = zkSessionTimeout + def setZkSessionTimeout(zkSessionTimeout: String) { this.zkSessionTimeout = zkSessionTimeout } def getBrokerList:String = brokerList def setBrokerList(brokerList: String) { this.brokerList = brokerList } @@ -51,8 +62,17 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { val connectDiagnostic : mutable.ListBuffer[String] = mutable.ListBuffer(); // check for config parameter validity val props = new Properties() - if( zkConnect == null) connectDiagnostic += "zkConnect" - else props.put("zk.connect", zkConnect); + + if(zkConnect == null) + connectDiagnostic += "zkConnect" + else { + props.put("zk.connect", zkConnect) + if(zkConnectTimeout != null) + props.put("zk.connectiontimeout.ms", zkConnectTimeout) + if(zkSessionTimeout != null) + props.put("zk.sessiontimeout.ms", zkSessionTimeout) + } + if( brokerList == null) connectDiagnostic += "brokerList" else if( props.isEmpty) props.put("broker.list", brokerList) if(props.isEmpty ) @@ -66,13 +86,19 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { LogLog.warn("Using default encoder - kafka.serializer.StringEncoder") } props.put("serializer.class", serializerClass) - val config : ProducerConfig = new ProducerConfig(props) - producer = new Producer[String, String](config) - LogLog.debug("Kafka producer connected to " + (if(config.zkConnect == null) config.brokerList else config.zkConnect)) - LogLog.debug("Logging for topic: " + topic) + config = new ProducerConfig(props) } override def append(event: LoggingEvent) { + // TODO Is AppenderSkeleton#append serialized, if so no need to synchronize here + lock.synchronized { + if(!initialized) { + producer = new Producer[String, String](config) + LogLog.debug("Kafka producer connected to " + (if(config.zkConnect == null) config.brokerList else config.zkConnect)) + LogLog.debug("Logging for topic: " + topic) + initialized = true + } + } val message : String = if( this.layout == null) { event.getRenderedMessage } @@ -86,7 +112,8 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { override def close() { if(!this.closed) { this.closed = true - producer.close() + if(initialized) + producer.close() } } diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index 7f67eb3..1e93a9f 100644 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -220,6 +220,7 @@ class KafkaLog4jAppenderTest extends JUnitSuite with Logging { props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.ZkConnect", TestZKUtils.zookeeperConnect) + props.put("log4j.appender.KAFKA.ZkConnectTimeout", "10000") props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.logger.kafka.log4j", "INFO,KAFKA") props -- 1.7.5.4