diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 90cacbd..a6423f4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -283,7 +283,7 @@ public class KafkaProducer implements Producer { } public List partitionsFor(String topic) { - return this.metadata.fetch(topic, this.metadataFetchTimeoutMs).partitionsForTopic(topic); + return this.metadata.fetch(topic, this.metadataFetchTimeoutMs).partitionsFor(topic); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index c0f1d57..6a0f3b2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -102,7 +102,7 @@ public class MockProducer implements Producer { @Override public synchronized Future send(ProducerRecord record, Callback callback) { int partition = 0; - if (this.cluster.partitionsForTopic(record.topic()) != null) + if (this.cluster.partitionsFor(record.topic()) != null) partition = partitioner.partition(record, this.cluster); ProduceRequestResult result = new ProduceRequestResult(); FutureRecordMetadata future = new FutureRecordMetadata(result, 0); @@ -133,7 +133,7 @@ public class MockProducer implements Producer { } public List partitionsFor(String topic) { - return this.cluster.partitionsForTopic(topic); + return this.cluster.partitionsFor(topic); } public Map metrics() { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index f47a461..f114ffd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -81,7 +81,7 @@ public final class Metadata { long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; do { - partitions = cluster.partitionsForTopic(topic); + partitions = cluster.partitionsFor(topic); if (partitions == null) { topics.add(topic); forceUpdate = true; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java index 40e8234..fbb732a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java @@ -41,10 +41,10 @@ public class Partitioner { * Compute the partition for the given record. * * @param record The record being sent - * @param cluster The current cluster metadata + * @param numPartitions The total number of partitions for the given topic */ public int partition(ProducerRecord record, Cluster cluster) { - List partitions = cluster.partitionsForTopic(record.topic()); + List partitions = cluster.partitionsFor(record.topic()); int numPartitions = partitions.size(); if (record.partition() != null) { // they have given us a partition, use it diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 5ededcc..2d7e52d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -13,13 +13,15 @@ package org.apache.kafka.clients.producer.internals; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Deque; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentMap; import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; @@ -109,6 +111,11 @@ public final class RecordAccumulator { return free.availableMemory(); } }); + metrics.addMetric("ready-partitions", "The number of topic-partitions with buffered data ready to be sent.", new Measurable() { + public double measure(MetricConfig config, long nowMs) { + return ready(nowMs).size(); + } + }); } /** @@ -173,10 +180,9 @@ public final class RecordAccumulator { } /** - * Get a list of nodes whose partitions are ready to be sent. + * Get a list of topic-partitions which are ready to be sent. *

- * A destination node is ready to send data if ANY one of its partition is not backing off the send - * and ANY of the following are true : + * A partition is ready if ANY of the following are true: *

    *
  1. The record set is full *
  2. The record set has sat in the accumulator for at least lingerMs milliseconds @@ -185,31 +191,24 @@ public final class RecordAccumulator { *
  3. The accumulator has been closed *
*/ - public Set ready(Cluster cluster, long nowMs) { - Set readyNodes = new HashSet(); + public List ready(long nowMs) { + List ready = new ArrayList(); boolean exhausted = this.free.queued() > 0; - for (Map.Entry> entry : this.batches.entrySet()) { - TopicPartition part = entry.getKey(); Deque deque = entry.getValue(); - // if the leader is unknown use an Unknown node placeholder - Node leader = cluster.leaderFor(part); - if (!readyNodes.contains(leader)) { - synchronized (deque) { - RecordBatch batch = deque.peekFirst(); - if (batch != null) { - boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; - boolean full = deque.size() > 1 || batch.records.isFull(); - boolean expired = nowMs - batch.createdMs >= lingerMs; - boolean sendable = full || expired || exhausted || closed; - if (sendable && !backingOff) - readyNodes.add(leader); - } + synchronized (deque) { + RecordBatch batch = deque.peekFirst(); + if (batch != null) { + boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; + boolean full = deque.size() > 1 || batch.records.isFull(); + boolean expired = nowMs - batch.createdMs >= lingerMs; + boolean sendable = full || expired || exhausted || closed; + if (sendable && !backingOff) + ready.add(batch.topicPartition); } } } - - return readyNodes; + return ready; } /** @@ -227,55 +226,45 @@ public final class RecordAccumulator { } /** - * Drain all the data for the given nodes and collate them into a list of - * batches that will fit within the specified size on a per-node basis. - * This method attempts to avoid choosing the same topic-node over and over. + * Drain all the data for the given topic-partitions that will fit within the specified size. This method attempts + * to avoid choosing the same topic-partitions over and over. * - * @param cluster The current cluster metadata - * @param nodes The list of node to drain + * @param partitions The list of partitions to drain * @param maxSize The maximum number of bytes to drain * @param nowMs The current unix time in milliseconds - * @return A list of {@link RecordBatch} for each node specified with total size less than the requested maxSize. + * @return A list of {@link RecordBatch} for partitions specified with total size less than the requested maxSize. * TODO: There may be a starvation issue due to iteration order */ - public Map> drain(Cluster cluster, Set nodes, int maxSize, long nowMs) { - if (nodes.isEmpty()) - return Collections.emptyMap(); - - Map> batches = new HashMap>(); - for (Node node : nodes) { - int size = 0; - List parts = cluster.partitionsForNode(node.id()); - List ready = new ArrayList(); - /* to make starvation less likely this loop doesn't start at 0 */ - int start = drainIndex = drainIndex % parts.size(); - do { - PartitionInfo part = parts.get(drainIndex); - Deque deque = dequeFor(new TopicPartition(part.topic(), part.partition())); - if (deque != null) { - synchronized (deque) { - RecordBatch first = deque.peekFirst(); - if (first != null) { - if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { - // there is a rare case that a single batch size is larger than the request size due - // to compression; in this case we will still eventually send this batch in a single - // request - break; - } else { - RecordBatch batch = deque.pollFirst(); - batch.records.close(); - size += batch.records.sizeInBytes(); - ready.add(batch); - batch.drainedMs = nowMs; - } - } + public List drain(List partitions, int maxSize, long nowMs) { + if (partitions.isEmpty()) + return Collections.emptyList(); + int size = 0; + List ready = new ArrayList(); + /* to make starvation less likely this loop doesn't start at 0 */ + int start = drainIndex = drainIndex % partitions.size(); + do { + TopicPartition tp = partitions.get(drainIndex); + Deque deque = dequeFor(tp); + if (deque != null) { + synchronized (deque) { + RecordBatch first = deque.peekFirst(); + if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { + // there is a rare case that a single batch size is larger than the request size due + // to compression; in this case we will still eventually send this batch in a single + // request + return ready; + } else { + RecordBatch batch = deque.pollFirst(); + batch.records.close(); + size += batch.records.sizeInBytes(); + ready.add(batch); + batch.drainedMs = nowMs; } } - this.drainIndex = (this.drainIndex + 1) % parts.size(); - } while (start != drainIndex); - batches.put(node.id(), ready); - } - return batches; + } + this.drainIndex = (this.drainIndex + 1) % partitions.size(); + } while (start != drainIndex); + return ready; } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 3e83ae0..f0152fa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -15,7 +15,15 @@ package org.apache.kafka.clients.producer.internals; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; @@ -184,18 +192,18 @@ public class Sender implements Runnable { public void run(long nowMs) { Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send - Set ready = this.accumulator.ready(cluster, nowMs); + List ready = this.accumulator.ready(nowMs); // should we update our metadata? List sends = new ArrayList(); maybeUpdateMetadata(cluster, sends, nowMs); - // prune the list of ready nodes to eliminate any that we aren't ready to send yet - Set sendable = processReadyNode(ready, nowMs); + // prune the list of ready topics to eliminate any that we aren't ready to send yet + List sendable = processReadyPartitions(cluster, ready, nowMs); // create produce requests - Map> batches = this.accumulator.drain(cluster, sendable, this.maxRequestSize, nowMs); - List requests = generateProduceRequests(batches, nowMs); + List batches = this.accumulator.drain(sendable, this.maxRequestSize, nowMs); + List requests = collate(cluster, batches, nowMs); sensors.updateProduceRequestMetrics(requests); if (ready.size() > 0) { @@ -302,21 +310,19 @@ public class Sender implements Runnable { } /** - * Process the set of destination nodes with data ready to send. - * - * 1) If we have an unknown leader node, force refresh the metadata. - * 2) If we have a connection to the appropriate node, add - * it to the returned set; - * 3) If we have not a connection yet, initialize one + * Process the set of topic-partitions with data ready to send. If we have a connection to the appropriate node, add + * it to the returned set. For any partitions we have no connection to either make one, fetch the appropriate + * metadata to be able to do so */ - private Set processReadyNode(Set ready, long nowMs) { - Set sendable = new HashSet(ready.size()); - for (Node node : ready) { + private List processReadyPartitions(Cluster cluster, List ready, long nowMs) { + List sendable = new ArrayList(ready.size()); + for (TopicPartition tp : ready) { + Node node = cluster.leaderFor(tp); if (node == null) { // we don't know about this topic/partition or it has no leader, re-fetch metadata metadata.forceUpdate(); } else if (nodeStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { - sendable.add(node); + sendable.add(tp); } else if (nodeStates.canConnect(node.id(), nowMs)) { // we don't have a connection to this node right now, make one initiateConnect(node, nowMs); @@ -514,9 +520,19 @@ public class Sender implements Runnable { } /** - * Transfer the record batches into a list of produce requests on a per-node basis + * Collate the record batches into a list of produce requests on a per-node basis */ - private List generateProduceRequests(Map> collated, long nowMs) { + private List collate(Cluster cluster, List batches, long nowMs) { + Map> collated = new HashMap>(); + for (RecordBatch batch : batches) { + Node node = cluster.leaderFor(batch.topicPartition); + List found = collated.get(node.id()); + if (found == null) { + found = new ArrayList(); + collated.put(node.id(), found); + } + found.add(batch); + } List requests = new ArrayList(collated.size()); for (Map.Entry> entry : collated.entrySet()) requests.add(produceRequest(nowMs, entry.getKey(), acks, requestTimeout, entry.getValue())); diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java index c62707a..426bd1e 100644 --- a/clients/src/main/java/org/apache/kafka/common/Cluster.java +++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java @@ -12,8 +12,6 @@ */ package org.apache.kafka.common; -import org.apache.kafka.common.utils.Utils; - import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; @@ -30,7 +28,6 @@ public final class Cluster { private final List nodes; private final Map partitionsByTopicPartition; private final Map> partitionsByTopic; - private final Map> partitionsByNode; /** * Create a new cluster with the given nodes and partitions @@ -48,32 +45,18 @@ public final class Cluster { for (PartitionInfo p : partitions) this.partitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p); - // index the partitions by topic and node respectively, and make the lists - // unmodifiable so we can hand them out in user-facing apis without risk - // of the client modifying the contents - HashMap> partsForTopic = new HashMap>(); - HashMap> partsForNode = new HashMap>(); - for (Node n : this.nodes) { - partsForNode.put(n.id(), new ArrayList()); - } + // index the partitions by topic and make the lists unmodifiable so we can handle them out in + // user-facing apis without risk of the client modifying the contents + HashMap> parts = new HashMap>(); for (PartitionInfo p : partitions) { - if (!partsForTopic.containsKey(p.topic())) - partsForTopic.put(p.topic(), new ArrayList()); - List psTopic = partsForTopic.get(p.topic()); - psTopic.add(p); - - if (p.leader() != null) { - List psNode = Utils.notNull(partsForNode.get(p.leader().id())); - psNode.add(p); - } + if (!parts.containsKey(p.topic())) + parts.put(p.topic(), new ArrayList()); + List ps = parts.get(p.topic()); + ps.add(p); } - this.partitionsByTopic = new HashMap>(partsForTopic.size()); - for (Map.Entry> entry : partsForTopic.entrySet()) + this.partitionsByTopic = new HashMap>(parts.size()); + for (Map.Entry> entry : parts.entrySet()) this.partitionsByTopic.put(entry.getKey(), Collections.unmodifiableList(entry.getValue())); - this.partitionsByNode = new HashMap>(partsForNode.size()); - for (Map.Entry> entry : partsForNode.entrySet()) - this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue())); - } /** @@ -130,19 +113,10 @@ public final class Cluster { * @param topic The topic name * @return A list of partitions */ - public List partitionsForTopic(String topic) { + public List partitionsFor(String topic) { return this.partitionsByTopic.get(topic); } - /** - * Get the list of partitions whose leader is this node - * @param nodeId The node id - * @return A list of partitions - */ - public List partitionsForNode(int nodeId) { - return this.partitionsByNode.get(nodeId); - } - @Override public String toString() { return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")"; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java index c4072ae..f37ab77 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java @@ -17,13 +17,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.RecordBatch; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.CompressionType; @@ -35,19 +34,11 @@ import org.junit.Test; public class RecordAccumulatorTest { - private String topic = "test"; - private int partition1 = 0; - private int partition2 = 1; - private Node node = new Node(0, "localhost", 1111); - private TopicPartition tp1 = new TopicPartition(topic, partition1); - private TopicPartition tp2 = new TopicPartition(topic, partition2); - private PartitionInfo part1 = new PartitionInfo(topic, partition1, node, null, null); - private PartitionInfo part2 = new PartitionInfo(topic, partition2, node, null, null); + private TopicPartition tp = new TopicPartition("test", 0); private MockTime time = new MockTime(); private byte[] key = "key".getBytes(); private byte[] value = "value".getBytes(); private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value); - private Cluster cluster = new Cluster(Collections.singleton(node), Arrays.asList(part1, part2)); private Metrics metrics = new Metrics(time); @Test @@ -56,12 +47,12 @@ public class RecordAccumulatorTest { RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time); int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { - accum.append(tp1, key, value, CompressionType.NONE, null); - assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).size()); + accum.append(tp, key, value, CompressionType.NONE, null); + assertEquals("No partitions should be ready.", 0, accum.ready(now).size()); } - accum.append(tp1, key, value, CompressionType.NONE, null); - assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds())); - List batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id()); + accum.append(tp, key, value, CompressionType.NONE, null); + assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds())); + List batches = accum.drain(asList(tp), Integer.MAX_VALUE, 0); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); Iterator iter = batch.records.iterator(); @@ -77,19 +68,19 @@ public class RecordAccumulatorTest { public void testAppendLarge() throws Exception { int batchSize = 512; RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time); - accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null); - assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds())); + accum.append(tp, key, new byte[2 * batchSize], CompressionType.NONE, null); + assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds())); } @Test public void testLinger() throws Exception { long lingerMs = 10L; RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time); - accum.append(tp1, key, value, CompressionType.NONE, null); - assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).size()); + accum.append(tp, key, value, CompressionType.NONE, null); + assertEquals("No partitions should be ready", 0, accum.ready(time.milliseconds()).size()); time.sleep(10); - assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds())); - List batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id()); + assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds())); + List batches = accum.drain(asList(tp), Integer.MAX_VALUE, 0); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); Iterator iter = batch.records.iterator(); @@ -103,14 +94,14 @@ public class RecordAccumulatorTest { public void testPartialDrain() throws Exception { RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time); int appends = 1024 / msgSize + 1; - List partitions = asList(tp1, tp2); + List partitions = asList(new TopicPartition("test", 0), new TopicPartition("test", 1)); for (TopicPartition tp : partitions) { for (int i = 0; i < appends; i++) accum.append(tp, key, value, CompressionType.NONE, null); } - assertEquals("Partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds())); + assertEquals("Both partitions should be ready", 2, accum.ready(time.milliseconds()).size()); - List batches = accum.drain(cluster, Collections.singleton(node), 1024, 0).get(node.id()); + List batches = accum.drain(partitions, 1024, 0); assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size()); } @@ -118,7 +109,7 @@ public class RecordAccumulatorTest { public void testStressfulSituation() throws Exception { final int numThreads = 5; final int msgs = 10000; - final int numParts = 2; + final int numParts = 10; final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, true, metrics, time); List threads = new ArrayList(); for (int i = 0; i < numThreads; i++) { @@ -126,7 +117,7 @@ public class RecordAccumulatorTest { public void run() { for (int i = 0; i < msgs; i++) { try { - accum.append(new TopicPartition(topic, i % numParts), key, value, CompressionType.NONE, null); + accum.append(new TopicPartition("test", i % numParts), key, value, CompressionType.NONE, null); } catch (Exception e) { e.printStackTrace(); } @@ -139,14 +130,12 @@ public class RecordAccumulatorTest { int read = 0; long now = time.milliseconds(); while (read < numThreads * msgs) { - Set nodes = accum.ready(cluster, now); - List batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node.id()); - if (batches != null) { - for (RecordBatch batch : batches) { - for (LogEntry entry : batch.records) - read++; - accum.deallocate(batch); - } + List tps = accum.ready(now); + List batches = accum.drain(tps, 5 * 1024, 0); + for (RecordBatch batch : batches) { + for (LogEntry entry : batch.records) + read++; + accum.deallocate(batch); } } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java index c788e66..1df2266 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java @@ -20,8 +20,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import org.apache.kafka.common.utils.AbstractIterator; import org.junit.Test; public class AbstractIteratorTest { @@ -44,7 +49,7 @@ public class AbstractIteratorTest { @Test(expected = NoSuchElementException.class) public void testEmptyIterator() { - Iterator iter = new ListIterator(Collections.emptyList()); + Iterator iter = new ListIterator(Arrays.asList()); iter.next(); } diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 26730c4..fdfb160 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -17,7 +17,7 @@ package kafka.tools -import kafka.utils.{Utils, CommandLineUtils, Logging} +import kafka.utils.{SystemTime, Utils, CommandLineUtils, Logging} import kafka.consumer._ import kafka.serializer._ import kafka.producer.{OldProducer, NewShinyProducer, BaseProducer} @@ -26,9 +26,11 @@ import org.apache.kafka.clients.producer.ProducerRecord import scala.collection.mutable.ListBuffer import scala.collection.JavaConversions._ -import java.util.concurrent.{BlockingQueue, ArrayBlockingQueue, CountDownLatch} +import java.util.concurrent.{TimeUnit, BlockingQueue, ArrayBlockingQueue, CountDownLatch} import joptsimple.OptionParser +import kafka.metrics.KafkaMetricsGroup +import com.yammer.metrics.core.Gauge object MirrorMaker extends Logging { @@ -73,7 +75,8 @@ object MirrorMaker extends Logging { .ofType(classOf[java.lang.Integer]) .defaultsTo(1) - val bufferSizeOpt = parser.accepts("queue.size", "Number of messages that are buffered between the consumer and producer") + val bufferSizeOpt = parser.accepts("queue.size", + "Number of messages that are buffered between the consumer and producer") .withRequiredArg() .describedAs("Queue size in terms of number of messages") .ofType(classOf[java.lang.Integer]) @@ -114,7 +117,7 @@ object MirrorMaker extends Logging { val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) // create data channel - val mirrorDataChannel = new ArrayBlockingQueue[ProducerRecord](bufferSize) + val mirrorDataChannel = new DataChannel(bufferSize) // create producer threads val producers = (1 to numProducers).map(_ => { @@ -178,14 +181,31 @@ object MirrorMaker extends Logging { info("Kafka mirror maker shutdown successfully") } + class DataChannel(capacity: Int) extends KafkaMetricsGroup { + + val queue = new ArrayBlockingQueue[ProducerRecord](capacity) + + newGauge( + "MirrorMaker-DataChannelSize", + new Gauge[Int] { + def value = queue.size + } + ) + + def put(record: ProducerRecord) = queue.put(record) + def put(record: ProducerRecord, timeout: Int): Boolean = queue.offer(record, timeout, TimeUnit.MILLISECONDS) + def take(timeout: Int): ProducerRecord = queue.poll(timeout, TimeUnit.MILLISECONDS) + } + class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]], - mirrorDataChannel: BlockingQueue[ProducerRecord], - producers: Seq[BaseProducer], - threadId: Int) - extends Thread with Logging { + mirrorDataChannel: DataChannel, + producers: Seq[BaseProducer], + threadId: Int) + extends Thread with Logging with KafkaMetricsGroup { private val shutdownLatch = new CountDownLatch(1) private val threadName = "mirrormaker-consumer-" + threadId + private val waitMeter = newMeter(threadName + "-WaitOnQueuePercent", "percent", TimeUnit.MILLISECONDS) this.logIdent = "[%s] ".format(threadName) this.setName(threadName) @@ -199,7 +219,12 @@ object MirrorMaker extends Logging { if (msgAndMetadata.key == null) { trace("Send the non-keyed message the producer channel.") val data = new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.message) - mirrorDataChannel.put(data) + var putSucceed = false + while (!putSucceed) { + val startPutTime = SystemTime.milliseconds + putSucceed = mirrorDataChannel.put(data, 500) + waitMeter.mark(SystemTime.milliseconds - startPutTime) + } } else { val producerId = Utils.abs(java.util.Arrays.hashCode(msgAndMetadata.key)) % producers.size() trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(msgAndMetadata.key), producerId)) @@ -226,11 +251,12 @@ object MirrorMaker extends Logging { } } - class ProducerThread (val dataChannel: BlockingQueue[ProducerRecord], + class ProducerThread (val dataChannel: DataChannel, val producer: BaseProducer, - val threadId: Int) extends Thread with Logging { + val threadId: Int) extends Thread with Logging with KafkaMetricsGroup { private val threadName = "mirrormaker-producer-" + threadId private val shutdownComplete: CountDownLatch = new CountDownLatch(1) + private val waitMeter = newMeter(threadName + "-WaitOnQueuePercent", "percent", TimeUnit.NANOSECONDS) this.logIdent = "[%s] ".format(threadName) setName(threadName) @@ -239,14 +265,18 @@ object MirrorMaker extends Logging { info("Starting mirror maker producer thread " + threadName) try { while (true) { - val data: ProducerRecord = dataChannel.take - trace("Sending message with value size %d".format(data.value().size)) - - if(data eq shutdownMessage) { - info("Received shutdown message") - return + val startTakeTime = SystemTime.nanoseconds + val data: ProducerRecord = dataChannel.take(500) + waitMeter.mark(SystemTime.nanoseconds - startTakeTime) + + if (data != null) { + trace("Sending message with value size %d".format(data.value().size)) + if(data eq shutdownMessage) { + info("Received shutdown message") + return + } + producer.send(data.topic(), data.key(), data.value()) } - producer.send(data.topic(), data.key(), data.value()) } } catch { case t: Throwable => {