diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 416ecad..eb23589 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -43,6 +43,11 @@ do CLASSPATH=$CLASSPATH:$file done +for file in $base_dir/clients/target/scala-${SCALA_VERSION}/clients*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + # classpath addition for release for file in $base_dir/libs/*.jar; do diff --git a/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.java b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.java new file mode 100644 index 0000000..af01d1f --- /dev/null +++ b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.java @@ -0,0 +1,160 @@ +package kafka.tools.newproducer; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; + +import kafka.clients.producer.KafkaProducer; +import kafka.clients.producer.ProducerConfig; +import kafka.clients.producer.ProducerRecord; +import kafka.common.ByteSerialization; +import kafka.common.utils.Utils; +import kafka.consumer.Blacklist; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.KafkaStream; +import kafka.consumer.TopicFilter; +import kafka.consumer.Whitelist; +import kafka.javaapi.consumer.ZookeeperConsumerConnector; +import kafka.message.MessageAndMetadata; + +public class MirrorMaker { + + static class ProducerDataChannel { + private final List producers; + private int producerIndex = 0; + + public ProducerDataChannel() { + producers = new ArrayList(); + } + + public void addProducer(KafkaProducer producer) { + producers.add(producer); + } + + public void send(ProducerRecord data) { + KafkaProducer producer = null; + if(data.key() == null) { + producer = producers.get(producerIndex++ % producers.size()); + } else { + int producerId = Utils.abs(data.key().hashCode()) % producers.size(); + producer = producers.get(producerId); + } + producer.send(data); + } + + public void close() { + for(KafkaProducer producer : producers) + producer.close(); + } + } + + static class MirrorThread extends Thread { + private String threadName = "mirroring-thread-"; + private final CountDownLatch shutdownLatch; + private final KafkaStream stream; + private final ProducerDataChannel producerChannel; + + public MirrorThread(KafkaStream consumerStream, ProducerDataChannel producerChannel, int threadId) { + threadName = threadName + threadId; + shutdownLatch = new CountDownLatch(1); + stream = consumerStream; + this.producerChannel = producerChannel; + this.setName(threadName); + } + + @Override + public void run() { + try { + for(MessageAndMetadata message : stream) { + producerChannel.send(new ProducerRecord(message.topic(), message.key(), message.message())); + } + } catch (Throwable t) { + t.printStackTrace(); + } finally { + shutdownLatch.countDown(); + } + } + + public void awaitShutdown() { + try { + shutdownLatch.await(); + } catch (InterruptedException ie) { + ie.printStackTrace(); + } + } + } + + public static void main(String[] args) { + if (args.length != 5) { + System.err.println("USAGE: java " + MirrorMaker.class.getName() + " consumer_config producer_config whitelist blacklist num_streams"); + System.exit(1); + } + String consumerConfigFile = args[0]; + String producerConfigFile = args[1]; + String whitelist = args[2]; + String blacklist = args[3]; + int numStreams = Integer.parseInt(args[4]); + boolean hasWhitelist = whitelist != null ? whitelist.length() > 0 : false; + boolean hasBlacklist = blacklist != null ? blacklist.length() > 0 : false; + if(!(hasWhitelist ^ hasBlacklist)) { + System.err.println("Exactly one of whitelist or blacklist must be specified"); + System.exit(1); + } + FileInputStream consumerConfigInputStream = null; + Properties consumerProperties = new Properties(); + FileInputStream producerConfigInputStream = null; + Properties producerProperties = new Properties(); + try { + consumerConfigInputStream = new FileInputStream(consumerConfigFile); + consumerProperties.load(consumerConfigInputStream); + producerConfigInputStream = new FileInputStream(producerConfigFile); + producerProperties.load(producerConfigInputStream); + producerProperties.setProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL, "true"); + producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteSerialization.class.getName()); + producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteSerialization.class.getName()); + System.out.println("Consumer properties: " + consumerProperties.toString()); + System.out.println("Producer properties: " + producerProperties.toString()); + } catch (IOException ioe) { + ioe.printStackTrace(); + } + TopicFilter topicFilter = null; + if (hasWhitelist) + topicFilter = new Whitelist(whitelist); + else + topicFilter = new Blacklist(blacklist); + final ZookeeperConsumerConnector consumer = new ZookeeperConsumerConnector(new ConsumerConfig(consumerProperties)); + final List mirrorThreads = new ArrayList(); + final ProducerDataChannel producerChannel = new ProducerDataChannel(); + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + consumer.shutdown(); + for(MirrorThread mirrorThread : mirrorThreads) + mirrorThread.awaitShutdown(); + producerChannel.close(); + } + }); + List> streams = null; + try { + streams = consumer.createMessageStreamsByFilter(topicFilter, 1); + } catch (Throwable t) { + System.err.println("Unable to create streams - shutting down mirror maker"); + t.printStackTrace(); + consumer.shutdown(); + System.exit(1); + } + for(int i = 0;i < streams.size(); i++) + producerChannel.addProducer(new KafkaProducer(producerProperties)); + int threadId = 0; + for(KafkaStream stream : streams) { + MirrorThread mirrorThread = new MirrorThread(stream, producerChannel, threadId++); + mirrorThreads.add(mirrorThread); + mirrorThread.start(); + } + for(MirrorThread mirrorThread : mirrorThreads) + mirrorThread.awaitShutdown(); + System.out.println("Shut down mirror maker"); + } +} diff --git a/project/Build.scala b/project/Build.scala index ddcfc41..1b8a392 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -138,15 +138,15 @@ object KafkaBuild extends Build { } } - lazy val kafka = Project(id = "Kafka", base = file(".")).aggregate(core, examples, contrib, perf).settings((commonSettings ++ + lazy val kafka = Project(id = "Kafka", base = file(".")).aggregate(core, examples, contrib, perf, clients).settings((commonSettings ++ runRatTask ++ releaseTask ++ releaseZipTask ++ releaseTarTask): _*) - lazy val core = Project(id = "core", base = file("core")).settings(commonSettings: _*) + lazy val core = Project(id = "core", base = file("core")).settings(commonSettings: _*) dependsOn (clients) lazy val examples = Project(id = "java-examples", base = file("examples")).settings(commonSettings :_*) dependsOn (core) lazy val perf = Project(id = "perf", base = file("perf")).settings((Seq(name := "kafka-perf") ++ commonSettings):_*) dependsOn (core) lazy val contrib = Project(id = "contrib", base = file("contrib")).aggregate(hadoopProducer, hadoopConsumer).settings(commonSettings :_*) lazy val hadoopProducer = Project(id = "hadoop-producer", base = file("contrib/hadoop-producer")).settings(hadoopSettings ++ commonSettings: _*) dependsOn (core) lazy val hadoopConsumer = Project(id = "hadoop-consumer", base = file("contrib/hadoop-consumer")).settings(hadoopSettings ++ commonSettings: _*) dependsOn (core) - lazy val clients = Project(id = "kafka-clients", base = file("clients")) + lazy val clients = Project(id = "kafka-clients", base = file("clients")).settings(commonSettings: _*) }