diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index a780a41..bc68307 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -646,6 +646,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val topicThreadId = e._1 val q = e._2._1 topicThreadIdAndQueues.put(topicThreadId, q) + debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString)) newGauge( config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize", new Gauge[Int] { diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index 66638f2..4738c3b 100644 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -41,7 +41,7 @@ class Producer[K,V](config: ProducerConfig, case "async" => sync = false val asyncProducerID = random.nextInt(Int.MaxValue) - producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + asyncProducerID, + producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId, queue, eventHandler, config.queueBufferingMaxMs, diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java index dbbddae..c71c76b 100644 --- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java +++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java @@ -34,25 +34,26 @@ import java.net.URL; import java.net.URLClassLoader; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Properties; /** - * The kafka 07 to 08 online migration tool, it's used for migrating data from 07 to 08 cluster. Internally, - * it's composed of a kafka 07 consumer and kafka 08 producer. The kafka 07 consumer consumes data from the - * 07 cluster, and the kafka 08 producer produces data to the 08 cluster. + * This is a kafka 0.7 to 0.8 online migration tool used for migrating data from 0.7 to 0.8 cluster. Internally, + * it's composed of a kafka 0.7 consumer and kafka 0.8 producer. The kafka 0.7 consumer consumes data from the + * 0.7 cluster, and the kafka 0.8 producer produces data to the 0.8 cluster. * - * The 07 consumer is loaded from kafka 07 jar using a "parent last, child first" java class loader. - * Ordinary class loader is "parent first, child last", and kafka 08 and 07 both have classes for a lot of - * class names like "kafka.consumer.Consumer", etc., so ordinary java URLClassLoader with kafka 07 jar will - * will still load the 08 version class. + * The 0.7 consumer is loaded from kafka 0.7 jar using a "parent last, child first" java class loader. + * Ordinary class loader is "parent first, child last", and kafka 0.8 and 0.7 both have classes for a lot of + * class names like "kafka.consumer.Consumer", etc., so ordinary java URLClassLoader with kafka 0.7 jar will + * will still load the 0.8 version class. * - * As kafka 07 and kafka 08 used different version of zkClient, the zkClient jar used by kafka 07 should + * As kafka 0.7 and kafka 0.8 used different version of zkClient, the zkClient jar used by kafka 0.7 should * also be used by the class loader. * - * The user need to provide the configuration file for 07 consumer and 08 producer. For 08 producer, - * the "serializer.class" filed is set to "kafka.serializer.DefaultEncode" by the code. + * The user need to provide the configuration file for 0.7 consumer and 0.8 producer. For 0.8 producer, + * the "serializer.class" config is set to "kafka.serializer.DefaultEncoder" by the code. */ public class KafkaMigrationTool @@ -83,13 +84,13 @@ public class KafkaMigrationTool public static void main(String[] args){ OptionParser parser = new OptionParser(); ArgumentAcceptingOptionSpec consumerConfigOpt - = parser.accepts("consumer.config", "Kafka 0.7 consumer config to consume from the source cluster. " + "You man specify multiple of these.") + = parser.accepts("consumer.config", "Kafka 0.7 consumer config to consume from the source 0.7 cluster. " + "You man specify multiple of these.") .withRequiredArg() .describedAs("config file") .ofType(String.class); ArgumentAcceptingOptionSpec producerConfigOpt - = parser.accepts("producer.config", "Embedded producer config.") + = parser.accepts("producer.config", "Producer config.") .withRequiredArg() .describedAs("config file") .ofType(String.class); @@ -114,20 +115,20 @@ public class KafkaMigrationTool .ofType(String.class); ArgumentAcceptingOptionSpec numStreamsOpt - = parser.accepts("num.streams", "Number of consumption streams.") + = parser.accepts("num.streams", "Number of consumer streams") .withRequiredArg() - .describedAs("Number of threads") + .describedAs("Number of consumer threads") .ofType(Integer.class) .defaultsTo(1); ArgumentAcceptingOptionSpec whitelistOpt - = parser.accepts("whitelist", "Whitelist of topics to mirror.") + = parser.accepts("whitelist", "Whitelist of topics to migrate from the 0.7 cluster") .withRequiredArg() .describedAs("Java regex (String)") .ofType(String.class); ArgumentAcceptingOptionSpec blacklistOpt - = parser.accepts("blacklist", "Blacklist of topics to mirror.") + = parser.accepts("blacklist", "Blacklist of topics to migrate from the 0.7 cluster") .withRequiredArg() .describedAs("Java regex (String)") .ofType(String.class); @@ -137,8 +138,8 @@ public class KafkaMigrationTool OptionSet options = parser.parse(args); - try{ - if (options.has(helpOpt)){ + try { + if (options.has(helpOpt)) { parser.printHelpOn(System.out); System.exit(0); } @@ -146,7 +147,7 @@ public class KafkaMigrationTool checkRequiredArgs(parser, options, new OptionSpec[]{consumerConfigOpt, producerConfigOpt, zkClient01JarOpt, kafka07JarOpt}); int whiteListCount = options.has(whitelistOpt) ? 1 : 0; int blackListCount = options.has(blacklistOpt) ? 1 : 0; - if(whiteListCount + blackListCount != 1){ + if(whiteListCount + blackListCount != 1) { System.err.println("Exactly one of whitelist or blacklist is required."); System.exit(1); } @@ -154,14 +155,14 @@ public class KafkaMigrationTool String kafkaJarFile_07 = options.valueOf(kafka07JarOpt); String zkClientJarFile = options.valueOf(zkClient01JarOpt); String consumerConfigFile_07 = options.valueOf(consumerConfigOpt); - int numStreams = options.valueOf(numStreamsOpt); + int numConsumers = options.valueOf(numStreamsOpt); String producerConfigFile_08 = options.valueOf(producerConfigOpt); int numProducers = options.valueOf(numProducersOpt); File kafkaJar_07 = new File(kafkaJarFile_07); File zkClientJar = new File(zkClientJarFile); - ParentLastURLClassLoader c1 = new ParentLastURLClassLoader(new URL[]{ + ParentLastURLClassLoader c1 = new ParentLastURLClassLoader(new URL[] { kafkaJar_07.toURI().toURL(), zkClientJar.toURI().toURL() }); @@ -182,7 +183,7 @@ public class KafkaMigrationTool Properties kafkaConsumerProperties_07 = new Properties(); kafkaConsumerProperties_07.load(new FileInputStream(consumerConfigFile_07)); /** Disable shallow iteration because the message format is different between 07 and 08, we have to get each individual message **/ - if(kafkaConsumerProperties_07.getProperty("shallow.iterator.enable", "").equals("true")){ + if(kafkaConsumerProperties_07.getProperty("shallow.iterator.enable", "").equals("true")) { logger.warn("Shallow iterator should not be used in the migration tool"); kafkaConsumerProperties_07.setProperty("shallow.iterator.enable", "false"); } @@ -207,34 +208,43 @@ public class KafkaMigrationTool else filterSpec = BlackListConstructor_07.newInstance(options.valueOf(blacklistOpt)); - Object retKafkaStreams = ConsumerConnectorCreateMessageStreamsMethod_07.invoke(consumerConnector_07, filterSpec, numStreams); + Object retKafkaStreams = ConsumerConnectorCreateMessageStreamsMethod_07.invoke(consumerConnector_07, filterSpec, numConsumers); Properties kafkaProducerProperties_08 = new Properties(); kafkaProducerProperties_08.load(new FileInputStream(producerConfigFile_08)); kafkaProducerProperties_08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder"); - ProducerConfig producerConfig_08 = new ProducerConfig(kafkaProducerProperties_08); List producers = new ArrayList(); - for (int i = 0; i < numProducers; i++){ + for (int i = 0; i < numProducers; i++) { + // setting the client id properly will help identify the producer threads properly. This will also get us + // per producer metrics, which are useful to troubleshoot bottlenecks + kafkaProducerProperties_08.put("client.id", String.valueOf(i)); + ProducerConfig producerConfig_08 = new ProducerConfig(kafkaProducerProperties_08); producers.add(new Producer(producerConfig_08)); } - + // don't give all the producers to all the consumer threads! That will cause all the migration threads to + // fight for the same producers ending up spending a lot of time locking the producer queues + // partition the producers amongst the consumer threads + int[] producerIdsPerConsumer = Utils.partition(numProducers, numConsumers); int threadId = 0; - for(Object stream: (List)retKafkaStreams){ - MigrationThread thread = new MigrationThread(stream, producers, threadId); + for(Object stream: (List)retKafkaStreams) { + int startProducerIndex = producerIdsPerConsumer[threadId]; + int endProducerIndex = numProducers; + if(threadId+1 < numConsumers) + endProducerIndex = producerIdsPerConsumer[threadId+1]; + List producersForThisConsumer = producers.subList(startProducerIndex, endProducerIndex); + logger.info(String.format("Migration thread %d started with producers [%d, %d)", threadId, startProducerIndex, endProducerIndex)); + MigrationThread thread = new MigrationThread(stream, producersForThisConsumer, threadId); threadId ++; thread.start(); } } catch (Throwable e){ - System.out.println("Kafka migration tool failed because of " + e); - e.printStackTrace(System.out); logger.error("Kafka migration tool failed: ", e); } } - private static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec[] required) throws IOException - { + private static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec[] required) throws IOException { for(OptionSpec arg : required) { if(!options.has(arg)) { System.err.println("Missing required argument \"" + arg + "\""); @@ -244,24 +254,23 @@ public class KafkaMigrationTool } } - - private static class MigrationThread extends Thread{ + private static class MigrationThread extends Thread { private Object stream; private List producers; private int threadId; private String threadName; private org.apache.log4j.Logger logger; - MigrationThread(Object _stream, List _producers, int _threadId){ + MigrationThread(Object _stream, List _producers, int _threadId) { stream = _stream; producers = _producers; threadId = _threadId; threadName = "MigrationThread-" + threadId; - logger = org.apache.log4j.Logger.getLogger(threadName); + logger = org.apache.log4j.Logger.getLogger(MigrationThread.class.getName()); this.setName(threadName); } - public void run(){ + public void run() { try{ Method MessageGetPayloadMethod_07 = KafkaMessageClass_07.getMethod("payload"); Method KafkaGetMessageMethod_07 = KafkaMessageAndMetatDataClass_07.getMethod("message"); @@ -273,9 +282,14 @@ public class KafkaMigrationTool Object iterator = ConsumerIteratorMethod.invoke(stream); + // shuffle the producers before creating a circular iterator to prevent all migration threads from + // fighting on the same producer queue. This is not required if numConsumers <= numProducers, + // but is required when numConsumers > numProducers because in that case multiple consumers will + // share some producers + Collections.shuffle(producers); Iterator producerCircularIterator = Utils.circularIterator(JavaConversions.asBuffer(producers)); - while (((Boolean) KafkaStreamHasNextMethod_07.invoke(iterator)).booleanValue()){ + while (((Boolean) KafkaStreamHasNextMethod_07.invoke(iterator)).booleanValue()) { Object messageAndMetaData_07 = KafkaStreamNextMethod_07.invoke(iterator); Object message_07 = KafkaGetMessageMethod_07.invoke(messageAndMetaData_07); Object topic = KafkaGetTopicMethod_07.invoke(messageAndMetaData_07); @@ -283,36 +297,36 @@ public class KafkaMigrationTool int size = ((ByteBuffer)payload_07).remaining(); byte[] bytes = new byte[size]; ((ByteBuffer)payload_07).get(bytes); - logger.debug(String.format("Send kafka 08 message of size %d to topic %s", bytes.length, topic)); + if(logger.isDebugEnabled()) + logger.debug("Migration thread " + threadId + " sending message of size " + bytes.length + " to topic "+ topic); KeyedMessage producerData = new KeyedMessage((String)topic, null, bytes); Producer nextProducer = producerCircularIterator.next(); nextProducer.send(producerData); } - logger.info(String.format("Migration thread %s finishes running", threadName)); + logger.info("Migration thread " + threadName + " finished running"); } catch (Throwable t){ logger.fatal("Migration thread failure due to ", t); - t.printStackTrace(System.out); } } } /** - * A parent-last classloader that will try the child classloader first and then the parent. + * A parent-last class loader that will try the child class loader first and then the parent. * This takes a fair bit of doing because java really prefers parent-first. */ - private static class ParentLastURLClassLoader extends ClassLoader{ + private static class ParentLastURLClassLoader extends ClassLoader { private ChildURLClassLoader childClassLoader; /** - * This class allows me to call findClass on a classloader + * This class allows me to call findClass on a class loader */ - private static class FindClassClassLoader extends ClassLoader{ - public FindClassClassLoader(ClassLoader parent){ + private static class FindClassClassLoader extends ClassLoader { + public FindClassClassLoader(ClassLoader parent) { super(parent); } @Override - public Class findClass(String name) throws ClassNotFoundException{ + public Class findClass(String name) throws ClassNotFoundException { return super.findClass(name); } } @@ -327,14 +341,15 @@ public class KafkaMigrationTool super(urls, null); this.realParent = realParent; } + @Override - public Class findClass(String name) throws ClassNotFoundException{ + public Class findClass(String name) throws ClassNotFoundException { try{ // first try to use the URLClassLoader findClass return super.findClass(name); } - catch( ClassNotFoundException e ){ - // if that fails, we ask our real parent classloader to load the class (we give up) + catch( ClassNotFoundException e ) { + // if that fails, we ask our real parent class loader to load the class (we give up) return realParent.loadClass(name); } } @@ -347,11 +362,11 @@ public class KafkaMigrationTool @Override protected synchronized Class loadClass(String name, boolean resolve) throws ClassNotFoundException { - try{ - // first we try to find a class inside the child classloader + try { + // first we try to find a class inside the child class loader return childClassLoader.findClass(name); } - catch( ClassNotFoundException e ){ + catch( ClassNotFoundException e ) { // didn't find it, try the parent return super.loadClass(name, resolve); } diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 0185c14..c0a6028 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -25,6 +25,7 @@ import java.lang.management._ import java.util.zip.CRC32 import javax.management._ import scala.collection._ +import mutable.ListBuffer import scala.collection.mutable import java.util.Properties import kafka.common.KafkaException @@ -548,5 +549,36 @@ object Utils extends Logging { * This is different from java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!). */ def abs(n: Int) = n & 0x7fffffff + + /** + * Partition n items amongst m entities and report the start index of the list of items that + * each of the m entities are assigned to + * Both items and entities should be > 0 + */ + def partition(items: Int, entities: Int): Array[Int] = { + require(items > 0) + require(entities > 0) + val numItemsPerEntity = items/entities + var nextItemIndex = 0 + val partitionedItems = + if(numItemsPerEntity < 1) { + for(entity <- 0 until entities) yield { + entity % items + } + }else { + val remainingItems = items - (numItemsPerEntity*entities) + for(entity <- 0 until entities) yield { + val itemsForThisEntity = + if(remainingItems > 0 && entity < remainingItems) { + numItemsPerEntity + 1 + }else + numItemsPerEntity + val startItemIndex = nextItemIndex + nextItemIndex += itemsForThisEntity + startItemIndex + } + } + partitionedItems.toArray + } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala index cce6c8e..c3cb789 100644 --- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala @@ -21,9 +21,9 @@ import java.util.Arrays import java.nio.ByteBuffer import org.apache.log4j.Logger import org.scalatest.junit.JUnitSuite -import org.junit.Test import org.junit.Assert._ import kafka.common.KafkaException +import org.junit.{Test} class UtilsTest extends JUnitSuite { @@ -53,7 +53,7 @@ class UtilsTest extends JUnitSuite { assertEquals(2, its.next()) assertEquals(1, its.next()) } - + @Test def testReadBytes() { for(testCase <- List("", "a", "abcd")) { @@ -74,4 +74,23 @@ class UtilsTest extends JUnitSuite { assertTrue(emptyStringList.equals(emptyListFromNullString)) assertTrue(emptyStringList.equals(emptyList)) } + + @Test + def testPartitioning() { + var partitionedItems = Utils.partition(items = 4, entities = 2) + assertEquals(2, partitionedItems.size) + assertEquals(Seq(0, 2), partitionedItems.toSeq) + partitionedItems = Utils.partition(items = 4, entities = 3) + assertEquals(3, partitionedItems.size) + assertEquals(Seq(0, 2, 3), partitionedItems.toSeq) + partitionedItems = Utils.partition(items = 3, entities = 4) + assertEquals(4, partitionedItems.size) + assertEquals(Seq(0, 1, 2, 0), partitionedItems.toSeq) + partitionedItems = Utils.partition(items = 3, entities = 5) + assertEquals(5, partitionedItems.size) + assertEquals(Seq(0, 1, 2, 0, 1), partitionedItems.toSeq) + partitionedItems = Utils.partition(items = 1, entities = 1) + assertEquals(1, partitionedItems.size) + assertEquals(Seq(0), partitionedItems.toSeq) + } }