diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala index 652dfb8..5d36a01 100644 --- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala +++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala @@ -29,6 +29,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { var topic: String = null var brokerList: String = null var compressionType: String = null + var retries: Int = 0 var requiredNumAcks: Int = Int.MaxValue var syncSend: Boolean = false @@ -49,6 +50,9 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { def getSyncSend: Boolean = syncSend def setSyncSend(syncSend: Boolean) { this.syncSend = syncSend } + def getRetries: Int = retries + def setRetries(retries: Int) { this.retries = retries } + override def activateOptions() { // check for config parameter validity val props = new Properties() @@ -60,6 +64,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { throw new MissingConfigException("topic must be specified by the Kafka log4j appender") if(compressionType != null) props.put(org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType) if(requiredNumAcks != Int.MaxValue) props.put(org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG, requiredNumAcks.toString) + if(retries > 0) props.put(org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG, retries.toString) props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") producer = new KafkaProducer[Array[Byte],Array[Byte]](props)