diff --git core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala index 3408d36..d6d5a81 100644 --- core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala +++ core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala @@ -33,6 +33,8 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { var syncSend: Boolean = false var keyClass: String = null var keyer: Keyer = null + var topicClass: String = null + var topicPicker: TopicPicker = null var props: Properties = null private var producer: KafkaProducer[Array[Byte],Array[Byte]] = null @@ -55,6 +57,9 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { def getKeyClass: String = keyClass def setKeyClass(keyClass: String) { this.keyClass = keyClass } + def getTopicClass: String = topicClass + def setTopicClass(topicClass: String) { this.topicClass = topicClass } + override def activateOptions() { // check for config parameter validity props = new Properties() @@ -62,8 +67,9 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) if(props.isEmpty) throw new MissingConfigException("The bootstrap servers property should be specified") - if(topic == null) - throw new MissingConfigException("topic must be specified by the Kafka log4j appender") + if(topic == null && topicClass == null) + throw new MissingConfigException("topic or topicClass must be specified by the Kafka log4j appender") + props.put("topic", topic); if(compressionType != null) props.put(org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType) if(requiredNumAcks != Int.MaxValue) props.put(org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG, requiredNumAcks.toString) props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") @@ -73,6 +79,10 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { keyer = Utils.createObject[Keyer](keyClass, props) LogLog.debug("Instantiated Key class " + keyClass) } + if(topicClass != null) { + topicPicker = Utils.createObject[TopicPicker](topicClass, props) + LogLog.debug("Instantiated Topic class " + topicClass) + } LogLog.debug("Kafka producer connected to " + brokerList) LogLog.debug("Logging for topic: " + topic) } @@ -81,7 +91,8 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { val message = subAppend(event) LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message) val keyBytes = if(keyer != null) keyer.getKey(message).getBytes() else null - val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, keyBytes, message.getBytes())) + val messageTopic = if(topicPicker != null) topicPicker.getTopic(message) else topic + val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](messageTopic, keyBytes, message.getBytes())) if (syncSend) response.get } diff --git core/src/main/scala/kafka/producer/TopicPicker.scala core/src/main/scala/kafka/producer/TopicPicker.scala new file mode 100644 index 0000000..359d937 --- /dev/null +++ core/src/main/scala/kafka/producer/TopicPicker.scala @@ -0,0 +1,15 @@ +package kafka.producer + +/** + * TopicPicker is used to derive a topic from the message. + * + * Implementations will be constructed via reflection and are required to have a constructor that takes a single + * VerifiableProperties instance--this allows passing configuration properties into the TopicPicker implementation. + */ +trait TopicPicker { + /** + * returns a topic based on the message. + * + */ + def getTopic(value: String): String +} \ No newline at end of file