From 1c9f963d0c67f527b642a423f7f7363077d1342d Mon Sep 17 00:00:00 2001 From: Benoy Antony Date: Fri, 3 Apr 2015 16:18:40 -0700 Subject: [PATCH 1/2] KAFKA-2041 --- .../scala/kafka/producer/KafkaLog4jAppender.scala | 17 +++++++++-- core/src/main/scala/kafka/producer/Keyer.scala | 34 ++++++++++++++++++++++ 2 files changed, 48 insertions(+), 3 deletions(-) create mode 100644 core/src/main/scala/kafka/producer/Keyer.scala diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala index 652dfb8..3408d36 100644 --- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala +++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala @@ -21,7 +21,7 @@ import async.MissingConfigException import org.apache.log4j.spi.LoggingEvent import org.apache.log4j.AppenderSkeleton import org.apache.log4j.helpers.LogLog -import kafka.utils.Logging +import kafka.utils._ import java.util.{Properties, Date} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} @@ -31,6 +31,9 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { var compressionType: String = null var requiredNumAcks: Int = Int.MaxValue var syncSend: Boolean = false + var keyClass: String = null + var keyer: Keyer = null + var props: Properties = null private var producer: KafkaProducer[Array[Byte],Array[Byte]] = null @@ -49,9 +52,12 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { def getSyncSend: Boolean = syncSend def setSyncSend(syncSend: Boolean) { this.syncSend = syncSend } + def getKeyClass: String = keyClass + def setKeyClass(keyClass: String) { this.keyClass = keyClass } + override def activateOptions() { // check for config parameter validity - val props = new Properties() + props = new Properties() if(brokerList != null) props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) if(props.isEmpty) @@ -63,6 +69,10 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") producer = new KafkaProducer[Array[Byte],Array[Byte]](props) + if(keyClass != null) { + keyer = Utils.createObject[Keyer](keyClass, props) + LogLog.debug("Instantiated Key class " + keyClass) + } LogLog.debug("Kafka producer connected to " + brokerList) LogLog.debug("Logging for topic: " + topic) } @@ -70,7 +80,8 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { override def append(event: LoggingEvent) { val message = subAppend(event) LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message) - val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message.getBytes())) + val keyBytes = if(keyer != null) keyer.getKey(message).getBytes() else null + val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, keyBytes, message.getBytes())) if (syncSend) response.get } diff --git a/core/src/main/scala/kafka/producer/Keyer.scala b/core/src/main/scala/kafka/producer/Keyer.scala new file mode 100644 index 0000000..597fe4e --- /dev/null +++ b/core/src/main/scala/kafka/producer/Keyer.scala @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package kafka.producer + +import java.util.Properties + +/** + * Keyer is used to derive a key from the message. A Key is used to determine kafka partition for the message. + * + * Implementations will be constructed via reflection and are required to have a constructor that takes a single + * VerifiableProperties instance--this allows passing configuration properties into the Keyer implementation. + */ +trait Keyer { + /** + * Uses the key to calculate a partition bucket id for routing + * the data to the appropriate broker partition + * @return returns a key based on the message + */ + def getKey(value: String): String +} -- 1.8.5.2 (Apple Git-48) From 8b11ffeb681fa56350314247974d02b698a138ca Mon Sep 17 00:00:00 2001 From: Benoy Antony Date: Fri, 3 Apr 2015 16:24:27 -0700 Subject: [PATCH 2/2] KAFKA-2077 --- .../main/scala/kafka/producer/KafkaLog4jAppender.scala | 17 ++++++++++++++--- core/src/main/scala/kafka/producer/TopicPicker.scala | 15 +++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) create mode 100644 core/src/main/scala/kafka/producer/TopicPicker.scala diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala index 3408d36..d6d5a81 100644 --- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala +++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala @@ -33,6 +33,8 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { var syncSend: Boolean = false var keyClass: String = null var keyer: Keyer = null + var topicClass: String = null + var topicPicker: TopicPicker = null var props: Properties = null private var producer: KafkaProducer[Array[Byte],Array[Byte]] = null @@ -55,6 +57,9 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { def getKeyClass: String = keyClass def setKeyClass(keyClass: String) { this.keyClass = keyClass } + def getTopicClass: String = topicClass + def setTopicClass(topicClass: String) { this.topicClass = topicClass } + override def activateOptions() { // check for config parameter validity props = new Properties() @@ -62,8 +67,9 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) if(props.isEmpty) throw new MissingConfigException("The bootstrap servers property should be specified") - if(topic == null) - throw new MissingConfigException("topic must be specified by the Kafka log4j appender") + if(topic == null && topicClass == null) + throw new MissingConfigException("topic or topicClass must be specified by the Kafka log4j appender") + props.put("topic", topic); if(compressionType != null) props.put(org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType) if(requiredNumAcks != Int.MaxValue) props.put(org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG, requiredNumAcks.toString) props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") @@ -73,6 +79,10 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { keyer = Utils.createObject[Keyer](keyClass, props) LogLog.debug("Instantiated Key class " + keyClass) } + if(topicClass != null) { + topicPicker = Utils.createObject[TopicPicker](topicClass, props) + LogLog.debug("Instantiated Topic class " + topicClass) + } LogLog.debug("Kafka producer connected to " + brokerList) LogLog.debug("Logging for topic: " + topic) } @@ -81,7 +91,8 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { val message = subAppend(event) LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message) val keyBytes = if(keyer != null) keyer.getKey(message).getBytes() else null - val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, keyBytes, message.getBytes())) + val messageTopic = if(topicPicker != null) topicPicker.getTopic(message) else topic + val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](messageTopic, keyBytes, message.getBytes())) if (syncSend) response.get } diff --git a/core/src/main/scala/kafka/producer/TopicPicker.scala b/core/src/main/scala/kafka/producer/TopicPicker.scala new file mode 100644 index 0000000..359d937 --- /dev/null +++ b/core/src/main/scala/kafka/producer/TopicPicker.scala @@ -0,0 +1,15 @@ +package kafka.producer + +/** + * TopicPicker is used to derive a topic from the message. + * + * Implementations will be constructed via reflection and are required to have a constructor that takes a single + * VerifiableProperties instance--this allows passing configuration properties into the TopicPicker implementation. + */ +trait TopicPicker { + /** + * returns a topic based on the message. + * + */ + def getTopic(value: String): String +} \ No newline at end of file -- 1.8.5.2 (Apple Git-48)