Kafka
  1. Kafka
  2. KAFKA-323

Add the ability to use the async producer in the Log4j appender

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.0
    • Component/s: core
    • Labels:

      Description

      I needed the log4j appender to use the async producer, so I added a couple of configuration methods to the log4j appender. I only added methods for the configuration fields that I needed. There are several in in the various ProducerConfigs that still cannot be set in the appender.

      Sample use:

      KafkaLog4jAppender kafkaAppender = new KafkaLog4jAppender();
      kafkaAppender.setZkConnect( "localhost:2181/kafka" );
      kafkaAppender.setTopic( "webapp" );
      kafkaAppender.setProducerType( "async" );
      kafkaAppender.setEnqueueTimeout( Integer.toString( Integer.MIN_VALUE ) );
      kafkaAppender.activateOptions();

      1. appender.patch
        3 kB
        Jose Quinteiro

        Activity

        Hide
        Jose Quinteiro added a comment -

        Index: core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
        ===================================================================
        — core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (revision 1307067)
        +++ core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (working copy)
        @@ -22,9 +22,7 @@
        import org.apache.log4j.AppenderSkeleton
        import org.apache.log4j.helpers.LogLog
        import kafka.utils.Logging
        -import kafka.serializer.Encoder
        import java.util.

        {Properties, Date}

        -import kafka.message.Message
        import scala.collection._

        class KafkaLog4jAppender extends AppenderSkeleton with Logging {
        @@ -34,7 +32,11 @@
        var serializerClass:String = null
        var zkConnect:String = null
        var brokerList:String = null
        -
        + var producerType:String = null
        + var compressionCodec:String = null
        + var enqueueTimeout:String = null
        + var queueSize:String = null
        +
        private var producer: Producer[String, String] = null

        def getTopic:String = topic
        @@ -49,6 +51,18 @@
        def getSerializerClass:String = serializerClass
        def setSerializerClass(serializerClass:String)

        { this.serializerClass = serializerClass }

        + def getProducerType:String = producerType
        + def setProducerType(producerType:String)

        { this.producerType = producerType }

        +
        + def getCompressionCodec:String = compressionCodec
        + def setCompressionCodec(compressionCodec:String)

        { this.compressionCodec = compressionCodec }

        +
        + def getEnqueueTimeout:String = enqueueTimeout
        + def setEnqueueTimeout(enqueueTimeout:String)

        { this.enqueueTimeout = enqueueTimeout }

        +
        + def getQueueSize:String = queueSize
        + def setQueueSize(queueSize:String)

        { this.queueSize = queueSize }

        +
        override def activateOptions()

        { val connectDiagnostic : mutable.ListBuffer[String] = mutable.ListBuffer(); // check for config parameter validity @@ -68,6 +82,11 @@ LogLog.warn("Using default encoder - kafka.serializer.StringEncoder") }

        props.put("serializer.class", serializerClass)
        + //These have default values in ProducerConfig and AsyncProducerConfig. We don't care if they're not specified
        + if(producerType != null) props.put("producer.type", producerType)
        + if(compressionCodec != null) props.put("compression.codec", compressionCodec)
        + if(enqueueTimeout != null) props.put("queue.enqueueTimeout.ms", enqueueTimeout)
        + if(queueSize != null) props.put("queue.size", queueSize)
        val config : ProducerConfig = new ProducerConfig(props)
        producer = new Producer[String, String](config)
        LogLog.debug("Kafka producer connected to " + (if(config.zkConnect == null) config.brokerList else config.zkConnect))

        Show
        Jose Quinteiro added a comment - Index: core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala =================================================================== — core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (revision 1307067) +++ core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (working copy) @@ -22,9 +22,7 @@ import org.apache.log4j.AppenderSkeleton import org.apache.log4j.helpers.LogLog import kafka.utils.Logging -import kafka.serializer.Encoder import java.util. {Properties, Date} -import kafka.message.Message import scala.collection._ class KafkaLog4jAppender extends AppenderSkeleton with Logging { @@ -34,7 +32,11 @@ var serializerClass:String = null var zkConnect:String = null var brokerList:String = null - + var producerType:String = null + var compressionCodec:String = null + var enqueueTimeout:String = null + var queueSize:String = null + private var producer: Producer [String, String] = null def getTopic:String = topic @@ -49,6 +51,18 @@ def getSerializerClass:String = serializerClass def setSerializerClass(serializerClass:String) { this.serializerClass = serializerClass } + def getProducerType:String = producerType + def setProducerType(producerType:String) { this.producerType = producerType } + + def getCompressionCodec:String = compressionCodec + def setCompressionCodec(compressionCodec:String) { this.compressionCodec = compressionCodec } + + def getEnqueueTimeout:String = enqueueTimeout + def setEnqueueTimeout(enqueueTimeout:String) { this.enqueueTimeout = enqueueTimeout } + + def getQueueSize:String = queueSize + def setQueueSize(queueSize:String) { this.queueSize = queueSize } + override def activateOptions() { val connectDiagnostic : mutable.ListBuffer[String] = mutable.ListBuffer(); // check for config parameter validity @@ -68,6 +82,11 @@ LogLog.warn("Using default encoder - kafka.serializer.StringEncoder") } props.put("serializer.class", serializerClass) + //These have default values in ProducerConfig and AsyncProducerConfig. We don't care if they're not specified + if(producerType != null) props.put("producer.type", producerType) + if(compressionCodec != null) props.put("compression.codec", compressionCodec) + if(enqueueTimeout != null) props.put("queue.enqueueTimeout.ms", enqueueTimeout) + if(queueSize != null) props.put("queue.size", queueSize) val config : ProducerConfig = new ProducerConfig(props) producer = new Producer [String, String] (config) LogLog.debug("Kafka producer connected to " + (if(config.zkConnect == null) config.brokerList else config.zkConnect))
        Hide
        Neha Narkhede added a comment -

        Thanks for the patch ! Would you mind attaching it as a file, and granting it to Apache ?

        Show
        Neha Narkhede added a comment - Thanks for the patch ! Would you mind attaching it as a file, and granting it to Apache ?

          People

          • Assignee:
            Jay Kreps
            Reporter:
            Jose Quinteiro
          • Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development