diff --git log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java index 628ff53..bacfb78 100644 --- log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java +++ log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Utils; import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.helpers.LogLog; import org.apache.log4j.spi.LoggingEvent; @@ -46,6 +47,8 @@ public class KafkaLog4jAppender extends AppenderSkeleton { private String brokerList = null; private String topic = null; private String compressionType = null; + private String keyerClass = null; + private Keyer keyer = null; private int retries = 0; private int requiredNumAcks = Integer.MAX_VALUE; @@ -104,6 +107,14 @@ public class KafkaLog4jAppender extends AppenderSkeleton { this.syncSend = syncSend; } + public String getKeyerClass() { + return keyerClass; + } + + public void setKeyerClass(String keyerClass) { + this.keyerClass = keyerClass; + } + @Override public void activateOptions() { // check for config parameter validity @@ -120,7 +131,14 @@ public class KafkaLog4jAppender extends AppenderSkeleton { props.put(ACKS_CONFIG, requiredNumAcks); if (retries > 0) props.put(RETRIES_CONFIG, retries); - + if (keyerClass != null) { + try { + keyer = (Keyer) Utils.newInstance(Class.forName(keyerClass)); + LogLog.debug("Instantiated Key class " + keyerClass); + } catch (ClassNotFoundException e) { + LogLog.error("Could not find Key class", e); + } + } props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); this.producer = getKafkaProducer(props); @@ -136,7 +154,8 @@ public class KafkaLog4jAppender extends AppenderSkeleton { protected void append(LoggingEvent event) { String message = subAppend(event); LogLog.debug("[" + new Date(event.getTimeStamp()) + "]" + message); - Future response = producer.send(new ProducerRecord(topic, message.getBytes())); + byte[] keyBytes = (keyer != null)?keyer.getKey(message).getBytes():null; + Future response = producer.send(new ProducerRecord(topic, keyBytes, message.getBytes())); if (syncSend) { try { response.get(); diff --git log4j-appender/src/main/java/org/apache/kafka/log4jappender/Keyer.java log4j-appender/src/main/java/org/apache/kafka/log4jappender/Keyer.java new file mode 100644 index 0000000..2493ba1 --- /dev/null +++ log4j-appender/src/main/java/org/apache/kafka/log4jappender/Keyer.java @@ -0,0 +1,32 @@ +package org.apache.kafka.log4jappender; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Keyer is used to derive a key from the message. A Key is used to determine kafka partition for the message. + * + * Implementations will be constructed via reflection and are required to have a constructor that takes a single + * VerifiableProperties instance--this allows passing configuration properties into the Keyer implementation. + */ +public interface Keyer { + + /** + * @return returns a key based on the message + */ + String getKey(String value); +} diff --git log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java index 71bdd94..7fc33e0 100644 --- log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java +++ log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java @@ -16,15 +16,18 @@ */ package org.apache.kafka.log4jappender; +import java.io.UnsupportedEncodingException; +import java.util.List; +import java.util.Properties; + +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Utils; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; import org.junit.Assert; import org.junit.Test; -import java.io.UnsupportedEncodingException; -import java.util.Properties; - public class KafkaLog4jAppenderTest { Logger logger = Logger.getLogger(KafkaLog4jAppenderTest.class); @@ -77,6 +80,24 @@ public class KafkaLog4jAppenderTest { 5, ((MockKafkaLog4jAppender) (logger.getRootLogger().getAppender("KAFKA"))).getHistory().size()); } + @Test + public void testLog4jAppendsWithKey() throws UnsupportedEncodingException { + PropertyConfigurator.configure(getLog4jConfigWithKeys()); + + for (int i = 1; i <= 5; ++i) { + logger.error("test_" + i); + } + + Assert.assertEquals( + 5, ((MockKafkaLog4jAppender) (logger.getRootLogger().getAppender("KAFKA"))).getHistory().size()); + List> list = + ((MockKafkaLog4jAppender) (logger.getRootLogger().getAppender("KAFKA"))).getHistory(); + int i = 1; + for (ProducerRecord record : list) { + Assert.assertEquals(i++, Integer.parseInt(Utils.utf8(record.key()))); + } + } + private byte[] getMessage(int i) throws UnsupportedEncodingException { return ("test_" + i).getBytes("UTF-8"); } @@ -94,5 +115,11 @@ public class KafkaLog4jAppenderTest { props.put("log4j.logger.kafka.log4j", "INFO, KAFKA"); return props; } + + private Properties getLog4jConfigWithKeys() { + Properties props = getLog4jConfig(); + props.put("log4j.appender.KAFKA.KeyerClass", "org.apache.kafka.log4jappender.MockKeyer"); + return props; + } } diff --git log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKeyer.java log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKeyer.java new file mode 100644 index 0000000..31722e1 --- /dev/null +++ log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKeyer.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.log4jappender; + +public class MockKeyer implements Keyer { + + @Override + public String getKey(String value) { + //returns x from "test_x" (last character is newline) + return value.substring(value.length()-2, value.length()-1); + } +}