From d1ff110ef2c47085bec4e78873cab1d0e8f73c94 Mon Sep 17 00:00:00 2001 From: asingh Date: Mon, 27 Apr 2015 19:51:30 -0700 Subject: [PATCH] KAFKA-2132: Move Log4J appender to clients module --- build.gradle | 56 ++++++- checkstyle/import-control.xml | 7 +- .../scala/kafka/producer/KafkaLog4jAppender.scala | 97 ----------- .../unit/kafka/log4j/KafkaLog4jAppenderTest.scala | 143 ---------------- .../org/apache/kafka/log4j/KafkaLog4jAppender.java | 183 +++++++++++++++++++++ .../apache/kafka/log4j/KafkaLog4jAppenderTest.java | 99 +++++++++++ .../apache/kafka/log4j/MockKafkaLog4jAppender.java | 45 +++++ settings.gradle | 2 +- 8 files changed, 383 insertions(+), 249 deletions(-) delete mode 100644 core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala delete mode 100755 core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala create mode 100644 log4j/src/main/java/org/apache/kafka/log4j/KafkaLog4jAppender.java create mode 100644 log4j/src/test/java/org/apache/kafka/log4j/KafkaLog4jAppenderTest.java create mode 100644 log4j/src/test/java/org/apache/kafka/log4j/MockKafkaLog4jAppender.java diff --git a/build.gradle b/build.gradle index 006ced4..075b92e 100644 --- a/build.gradle +++ b/build.gradle @@ -109,7 +109,7 @@ subprojects { archives srcJar archives javadocJar } - + plugins.withType(ScalaPlugin) { //source jar should also contain scala source: srcJar.from sourceSets.main.scala @@ -179,20 +179,20 @@ for ( sv in ['2_9_1', '2_9_2', '2_10_5', '2_11_6'] ) { } } -tasks.create(name: "jarAll", dependsOn: ['jar_core_2_9_1', 'jar_core_2_9_2', 'jar_core_2_10_5', 'jar_core_2_11_6', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar']) { +tasks.create(name: "jarAll", dependsOn: ['jar_core_2_9_1', 'jar_core_2_9_2', 'jar_core_2_10_5', 'jar_core_2_11_6', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j:jar']) { } -tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_9_1', 'srcJar_2_9_2', 'srcJar_2_10_5', 'srcJar_2_11_6', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar']) { } +tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_9_1', 'srcJar_2_9_2', 'srcJar_2_10_5', 'srcJar_2_11_6', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'log4j:srcJar']) { } -tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_9_1', 'docsJar_2_9_2', 'docsJar_2_10_5', 'docsJar_2_11_6', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar']) { } +tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_9_1', 'docsJar_2_9_2', 'docsJar_2_10_5', 'docsJar_2_11_6', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'log4j:docsJar']) { } -tasks.create(name: "testAll", dependsOn: ['test_core_2_9_1', 'test_core_2_9_2', 'test_core_2_10_5', 'test_core_2_11_6', 'clients:test']) { +tasks.create(name: "testAll", dependsOn: ['test_core_2_9_1', 'test_core_2_9_2', 'test_core_2_10_5', 'test_core_2_11_6', 'clients:test', 'log4j:test']) { } tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_9_1', 'releaseTarGz_2_9_2', 'releaseTarGz_2_10_5', 'releaseTarGz_2_11_6']) { } -tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_9_1', 'uploadCoreArchives_2_9_2', 'uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_6', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives']) { +tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_9_1', 'uploadCoreArchives_2_9_2', 'uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_6', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j:uploadArchives']) { } project(':core') { @@ -205,6 +205,7 @@ project(':core') { dependencies { compile project(':clients') + compile project(':log4j') compile "org.scala-lang:scala-library:$scalaVersion" compile 'org.apache.zookeeper:zookeeper:3.4.6' compile 'com.101tec:zkclient:0.3' @@ -215,6 +216,7 @@ project(':core') { testCompile 'org.easymock:easymock:3.0' testCompile 'org.objenesis:objenesis:1.2' testCompile project(':clients') + testCompile project(':log4j') if (scalaVersion.startsWith('2.10')) { testCompile 'org.scalatest:scalatest_2.10:1.9.1' } else if (scalaVersion.startsWith('2.11')) { @@ -382,7 +384,47 @@ project(':clients') { artifacts { archives testJar } - + + checkstyle { + configFile = new File(rootDir, "checkstyle/checkstyle.xml") + } + test.dependsOn('checkstyleMain', 'checkstyleTest') +} + +project(':log4j') { + apply plugin: 'checkstyle' + archivesBaseName = "kafka-log4j" + + dependencies { + compile project(':clients') + compile 'org.xerial.snappy:snappy-java:1.1.1.6' + compile 'net.jpountz.lz4:lz4:1.2.0' + compile "$slf4jlog4j" + + testCompile 'com.novocode:junit-interface:0.9' + testRuntime "$slf4jlog4j" + } + + task testJar(type: Jar) { + classifier = 'test' + from sourceSets.test.output + } + + test { + testLogging { + events "passed", "skipped", "failed" + exceptionFormat = 'full' + } + } + + javadoc { + include "**/org/apache/kafka/log4j/*" + } + + artifacts { + archives testJar + } + checkstyle { configFile = new File(rootDir, "checkstyle/checkstyle.xml") } diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index f2e6cec..4dd0b1a 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -95,8 +95,13 @@ + + + + + - + diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala deleted file mode 100644 index 5d36a01..0000000 --- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package kafka.producer - -import async.MissingConfigException -import org.apache.log4j.spi.LoggingEvent -import org.apache.log4j.AppenderSkeleton -import org.apache.log4j.helpers.LogLog -import kafka.utils.Logging -import java.util.{Properties, Date} -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} - -class KafkaLog4jAppender extends AppenderSkeleton with Logging { - var topic: String = null - var brokerList: String = null - var compressionType: String = null - var retries: Int = 0 - var requiredNumAcks: Int = Int.MaxValue - var syncSend: Boolean = false - - private var producer: KafkaProducer[Array[Byte],Array[Byte]] = null - - def getTopic: String = topic - def setTopic(topic: String) { this.topic = topic } - - def getBrokerList: String = brokerList - def setBrokerList(brokerList: String) { this.brokerList = brokerList } - - def getCompressionType: String = compressionType - def setCompressionType(compressionType: String) { this.compressionType = compressionType } - - def getRequiredNumAcks: Int = requiredNumAcks - def setRequiredNumAcks(requiredNumAcks: Int) { this.requiredNumAcks = requiredNumAcks } - - def getSyncSend: Boolean = syncSend - def setSyncSend(syncSend: Boolean) { this.syncSend = syncSend } - - def getRetries: Int = retries - def setRetries(retries: Int) { this.retries = retries } - - override def activateOptions() { - // check for config parameter validity - val props = new Properties() - if(brokerList != null) - props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - if(props.isEmpty) - throw new MissingConfigException("The bootstrap servers property should be specified") - if(topic == null) - throw new MissingConfigException("topic must be specified by the Kafka log4j appender") - if(compressionType != null) props.put(org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType) - if(requiredNumAcks != Int.MaxValue) props.put(org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG, requiredNumAcks.toString) - if(retries > 0) props.put(org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG, retries.toString) - props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") - props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") - producer = new KafkaProducer[Array[Byte],Array[Byte]](props) - LogLog.debug("Kafka producer connected to " + brokerList) - LogLog.debug("Logging for topic: " + topic) - } - - override def append(event: LoggingEvent) { - val message = subAppend(event) - LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message) - val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message.getBytes())) - if (syncSend) response.get - } - - def subAppend(event: LoggingEvent): String = { - if(this.layout == null) - event.getRenderedMessage - else - this.layout.format(event) - } - - override def close() { - if(!this.closed) { - this.closed = true - producer.close() - } - } - - override def requiresLayout: Boolean = true -} diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala deleted file mode 100755 index 41366a1..0000000 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log4j - -import kafka.consumer.SimpleConsumer -import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.{TestUtils, CoreUtils, Logging} -import kafka.api.FetchRequestBuilder -import kafka.producer.async.MissingConfigException -import kafka.serializer.Encoder -import kafka.zk.ZooKeeperTestHarness - -import java.util.Properties -import java.io.File - -import org.apache.log4j.spi.LoggingEvent -import org.apache.log4j.{PropertyConfigurator, Logger} -import org.junit.{After, Before, Test} -import org.scalatest.junit.JUnit3Suite - -import junit.framework.Assert._ - -class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { - - var logDirZk: File = null - var config: KafkaConfig = null - var server: KafkaServer = null - - var simpleConsumerZk: SimpleConsumer = null - - val tLogger = Logger.getLogger(getClass()) - - private val brokerZk = 0 - - @Before - override def setUp() { - super.setUp() - - val propsZk = TestUtils.createBrokerConfig(brokerZk, zkConnect) - val logDirZkPath = propsZk.getProperty("log.dir") - logDirZk = new File(logDirZkPath) - config = KafkaConfig.fromProps(propsZk) - server = TestUtils.createServer(config) - simpleConsumerZk = new SimpleConsumer("localhost", server.boundPort(), 1000000, 64 * 1024, "") - } - - @After - override def tearDown() { - simpleConsumerZk.close - server.shutdown - CoreUtils.rm(logDirZk) - super.tearDown() - } - - @Test - def testKafkaLog4jConfigs() { - // host missing - var props = new Properties() - props.put("log4j.rootLogger", "INFO") - props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") - props.put("log4j.appender.KAFKA.Topic", "test-topic") - props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") - - try { - PropertyConfigurator.configure(props) - fail("Missing properties exception was expected !") - } catch { - case e: MissingConfigException => - } - - // topic missing - props = new Properties() - props.put("log4j.rootLogger", "INFO") - props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") - props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromServers(Seq(server))) - props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") - - try { - PropertyConfigurator.configure(props) - fail("Missing properties exception was expected !") - } catch { - case e: MissingConfigException => - } - } - - @Test - def testLog4jAppends() { - PropertyConfigurator.configure(getLog4jConfig) - - for(i <- 1 to 5) - info("test") - - val response = simpleConsumerZk.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 1024*1024).build()) - val fetchMessage = response.messageSet("test-topic", 0) - - var count = 0 - for(message <- fetchMessage) { - count = count + 1 - } - - assertEquals(5, count) - } - - private def getLog4jConfig: Properties = { - val props = new Properties() - props.put("log4j.rootLogger", "INFO") - props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") - props.put("log4j.appender.KAFKA.BrokerList", TestUtils.getBrokerListStrFromServers(Seq(server))) - props.put("log4j.appender.KAFKA.Topic", "test-topic") - props.put("log4j.appender.KAFKA.RequiredNumAcks", "1") - props.put("log4j.appender.KAFKA.SyncSend", "true") - props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") - props - } -} - -class AppenderStringEncoder(encoding: String = "UTF-8") extends Encoder[LoggingEvent] { - def toBytes(event: LoggingEvent): Array[Byte] = { - event.getMessage.toString.getBytes(encoding) - } -} - diff --git a/log4j/src/main/java/org/apache/kafka/log4j/KafkaLog4jAppender.java b/log4j/src/main/java/org/apache/kafka/log4j/KafkaLog4jAppender.java new file mode 100644 index 0000000..ba63a14 --- /dev/null +++ b/log4j/src/main/java/org/apache/kafka/log4j/KafkaLog4jAppender.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.log4j; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; + +import java.util.Date; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +public class KafkaLog4jAppender extends AppenderSkeleton { + + private static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; + private static final String COMPRESSION_TYPE_CONFIG = "compression.type"; + private static final String ACKS_CONFIG = "acks"; + private static final String RETRIES_CONFIG = "retries"; + private static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer"; + private static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer"; + private static final Logger LOGGER = Logger.getLogger(KafkaLog4jAppender.class); + + private String brokerList = null; + private String topic = null; + private String compressionType = null; + + private int retries = 0; + private int requiredNumAcks = Integer.MAX_VALUE; + private boolean syncSend = false; + private Producer producer = null; + + public Producer getProducer() { + return producer; + } + + public String getBrokerList() { + return brokerList; + + } + + public void setBrokerList(String brokerList) { + this.brokerList = brokerList; + } + + public int getRequiredNumAcks() { + return requiredNumAcks; + } + + public void setRequiredNumAcks(int requiredNumAcks) { + this.requiredNumAcks = requiredNumAcks; + } + + public int getRetries() { + + return retries; + } + + public void setRetries(int retries) { + this.retries = retries; + } + + public String getCompressionType() { + + return compressionType; + } + + public void setCompressionType(String compressionType) { + this.compressionType = compressionType; + } + + public String getTopic() { + + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public boolean getSyncSend() { + return syncSend; + } + + public void setSyncSend(boolean syncSend) { + this.syncSend = syncSend; + } + + @Override + public void activateOptions() { + // check for config parameter validity + Properties props = new Properties(); + if (brokerList != null) { + props.put(BOOTSTRAP_SERVERS_CONFIG, brokerList); + } + if (props.isEmpty()) { + throw new MissingConfigException("The bootstrap servers property should be specified"); + } + if (topic == null) { + throw new MissingConfigException("topic must be specified by the Kafka log4j appender"); + } + if (compressionType != null) { + props.put(COMPRESSION_TYPE_CONFIG, compressionType); + } + if (requiredNumAcks != Integer.MAX_VALUE) { + props.put(ACKS_CONFIG, requiredNumAcks); + } + if (retries > 0) { + props.put(RETRIES_CONFIG, retries); + } + props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + this.producer = getKafkaProducer(props); + LOGGER.debug("Kafka producer connected to " + brokerList); + LOGGER.debug("Logging for topic: " + topic); + } + + protected Producer getKafkaProducer(Properties props) { + return new KafkaProducer(props); + } + + @Override + protected void append(LoggingEvent event) { + String message = subAppend(event); + LOGGER.debug("[" + new Date(event.getTimeStamp()) + "]" + message); + Future response = producer.send(new ProducerRecord(topic, message.getBytes())); + if (syncSend) { + try { + response.get(); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } catch (ExecutionException ex) { + throw new RuntimeException(ex); + } + } + } + + private String subAppend(LoggingEvent event) { + if (this.layout == null) { + return event.getRenderedMessage(); + } else { + return this.layout.format(event); + } + } + + @Override + public void close() { + if (!this.closed) { + this.closed = true; + producer.close(); + } + } + + @Override + public boolean requiresLayout() { + return true; + } +} + +class MissingConfigException extends RuntimeException { + public MissingConfigException(String message) { + super(message); + } +} diff --git a/log4j/src/test/java/org/apache/kafka/log4j/KafkaLog4jAppenderTest.java b/log4j/src/test/java/org/apache/kafka/log4j/KafkaLog4jAppenderTest.java new file mode 100644 index 0000000..61bc254 --- /dev/null +++ b/log4j/src/test/java/org/apache/kafka/log4j/KafkaLog4jAppenderTest.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.log4j; + +import org.apache.log4j.Logger; +import org.apache.log4j.PropertyConfigurator; +import org.junit.Assert; +import org.junit.Test; + +import java.io.UnsupportedEncodingException; +import java.util.Properties; + +public class KafkaLog4jAppenderTest { + + Logger logger = Logger.getLogger(KafkaLog4jAppenderTest.class); + + @Test + public void testKafkaLog4jConfigs() { + // host missing + Properties props = new Properties(); + props.put("log4j.rootLogger", "INFO"); + props.put("log4j.appender.KAFKA", "org.apache.kafka.log4j.KafkaLog4jAppender"); + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout"); + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n"); + props.put("log4j.appender.KAFKA.Topic", "test-topic"); + props.put("log4j.logger.kafka.log4j", "INFO, KAFKA"); + + try { + PropertyConfigurator.configure(props); + Assert.fail("Missing properties exception was expected !"); + } catch (MissingConfigException ex) { + // It's OK! + } + + // topic missing + props = new Properties(); + props.put("log4j.rootLogger", "INFO"); + props.put("log4j.appender.KAFKA", "org.apache.kafka.log4j.KafkaLog4jAppender"); + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout"); + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n"); + props.put("log4j.appender.KAFKA.brokerList", "127.0.0.1:9093"); + props.put("log4j.logger.kafka.log4j", "INFO, KAFKA"); + + try { + PropertyConfigurator.configure(props); + Assert.fail("Missing properties exception was expected !"); + } catch (MissingConfigException ex) { + // It's OK! + } + } + + + @Test + public void testLog4jAppends() throws UnsupportedEncodingException { + PropertyConfigurator.configure(getLog4jConfig()); + + for (int i = 1; i <= 5; ++i) { + if (logger.isInfoEnabled()) { + logger.info(getMessage(i)); + } + } + + Assert.assertEquals(((MockKafkaLog4jAppender) (logger.getRootLogger().getAppender("KAFKA"))).getHistory().size(), + 5); + } + + private byte[] getMessage(int i) throws UnsupportedEncodingException { + return ("test_" + i).getBytes("UTF-8"); + } + + private Properties getLog4jConfig() { + Properties props = new Properties(); + props.put("log4j.rootLogger", "INFO, KAFKA"); + props.put("log4j.appender.KAFKA", "org.apache.kafka.log4j.MockKafkaLog4jAppender"); + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout"); + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n"); + props.put("log4j.appender.KAFKA.BrokerList", "127.0.0.1:9093"); + props.put("log4j.appender.KAFKA.Topic", "test-topic"); + props.put("log4j.appender.KAFKA.RequiredNumAcks", "1"); + props.put("log4j.appender.KAFKA.SyncSend", "true"); + props.put("log4j.logger.kafka.log4j", "INFO, KAFKA"); + return props; + } +} + diff --git a/log4j/src/test/java/org/apache/kafka/log4j/MockKafkaLog4jAppender.java b/log4j/src/test/java/org/apache/kafka/log4j/MockKafkaLog4jAppender.java new file mode 100644 index 0000000..8f2a7f5 --- /dev/null +++ b/log4j/src/test/java/org/apache/kafka/log4j/MockKafkaLog4jAppender.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.log4j; + +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.log4j.spi.LoggingEvent; + +import java.util.Properties; + +public class MockKafkaLog4jAppender extends KafkaLog4jAppender { + private MockProducer mockProducer = new MockProducer(); + + @Override + protected Producer getKafkaProducer(Properties props) { + return mockProducer; + } + + @Override + protected void append(LoggingEvent event) { + if (super.getProducer() == null) { + activateOptions(); + } + super.append(event); + } + + protected java.util.List> getHistory() { + return mockProducer.history(); + } +} diff --git a/settings.gradle b/settings.gradle index 83f764e..d557a1c 100644 --- a/settings.gradle +++ b/settings.gradle @@ -14,4 +14,4 @@ // limitations under the License. apply from: file('scala.gradle') -include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients' +include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'log4j' -- 2.3.2 (Apple Git-55)