Index: core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala =================================================================== --- core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (revision 1307067) +++ core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (working copy) @@ -22,9 +22,7 @@ import org.apache.log4j.AppenderSkeleton import org.apache.log4j.helpers.LogLog import kafka.utils.Logging -import kafka.serializer.Encoder import java.util.{Properties, Date} -import kafka.message.Message import scala.collection._ class KafkaLog4jAppender extends AppenderSkeleton with Logging { @@ -34,7 +32,11 @@ var serializerClass:String = null var zkConnect:String = null var brokerList:String = null - + var producerType:String = null + var compressionCodec:String = null + var enqueueTimeout:String = null + var queueSize:String = null + private var producer: Producer[String, String] = null def getTopic:String = topic @@ -49,6 +51,18 @@ def getSerializerClass:String = serializerClass def setSerializerClass(serializerClass:String) { this.serializerClass = serializerClass } + def getProducerType:String = producerType + def setProducerType(producerType:String) { this.producerType = producerType } + + def getCompressionCodec:String = compressionCodec + def setCompressionCodec(compressionCodec:String) { this.compressionCodec = compressionCodec } + + def getEnqueueTimeout:String = enqueueTimeout + def setEnqueueTimeout(enqueueTimeout:String) { this.enqueueTimeout = enqueueTimeout } + + def getQueueSize:String = queueSize + def setQueueSize(queueSize:String) { this.queueSize = queueSize } + override def activateOptions() { val connectDiagnostic : mutable.ListBuffer[String] = mutable.ListBuffer(); // check for config parameter validity @@ -68,6 +82,11 @@ LogLog.warn("Using default encoder - kafka.serializer.StringEncoder") } props.put("serializer.class", serializerClass) + //These have default values in ProducerConfig and AsyncProducerConfig. We don't care if they're not specified + if(producerType != null) props.put("producer.type", producerType) + if(compressionCodec != null) props.put("compression.codec", compressionCodec) + if(enqueueTimeout != null) props.put("queue.enqueueTimeout.ms", enqueueTimeout) + if(queueSize != null) props.put("queue.size", queueSize) val config : ProducerConfig = new ProducerConfig(props) producer = new Producer[String, String](config) LogLog.debug("Kafka producer connected to " + (if(config.zkConnect == null) config.brokerList else config.zkConnect))