diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala index 652dfb8..37c7d80 100644 --- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala +++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala @@ -21,7 +21,7 @@ import async.MissingConfigException import org.apache.log4j.spi.LoggingEvent import org.apache.log4j.AppenderSkeleton import org.apache.log4j.helpers.LogLog -import kafka.utils.Logging +import kafka.utils._ import java.util.{Properties, Date} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} @@ -31,6 +31,9 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { var compressionType: String = null var requiredNumAcks: Int = Int.MaxValue var syncSend: Boolean = false + var keyClass: String = null + var keyer: Keyer = null + var props: Properties = null private var producer: KafkaProducer[Array[Byte],Array[Byte]] = null @@ -49,9 +52,12 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { def getSyncSend: Boolean = syncSend def setSyncSend(syncSend: Boolean) { this.syncSend = syncSend } + def getKeyClass: String = keyClass + def setKeyClass(keyClass: String) { this.keyClass = keyClass } + override def activateOptions() { // check for config parameter validity - val props = new Properties() + props = new Properties() if(brokerList != null) props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) if(props.isEmpty) @@ -63,6 +69,10 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") producer = new KafkaProducer[Array[Byte],Array[Byte]](props) + if(keyClass != null) { + keyer = Utils.createObject[Keyer](keyClass, props) + LogLog.debug("Instantiated Key class " + keyClass) + } LogLog.debug("Kafka producer connected to " + brokerList) LogLog.debug("Logging for topic: " + topic) } @@ -70,7 +80,8 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { override def append(event: LoggingEvent) { val message = subAppend(event) LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message) - val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message.getBytes())) + val keyBytes = if(keyer != null) keyer.key(message).getBytes() else null + val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, keyBytes, message.getBytes())) if (syncSend) response.get } diff --git a/core/src/main/scala/kafka/producer/Keyer.scala b/core/src/main/scala/kafka/producer/Keyer.scala new file mode 100644 index 0000000..597fe4e --- /dev/null +++ b/core/src/main/scala/kafka/producer/Keyer.scala @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package kafka.producer + +import java.util.Properties + +/** + * Keyer is used to derive a key from the message. A Key is used to determine kafka partition for the message. + * + * Implementations will be constructed via reflection and are required to have a constructor that takes a single + * VerifiableProperties instance--this allows passing configuration properties into the Keyer implementation. + */ +trait Keyer { + /** + * Uses the key to calculate a partition bucket id for routing + * the data to the appropriate broker partition + * @return returns a key based on the message + */ + def getKey(value: String): String +}