diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index d15562a..c58d4c0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -43,10 +43,7 @@ import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.Records; -import org.apache.kafka.common.utils.ClientUtils; -import org.apache.kafka.common.utils.KafkaThread; -import org.apache.kafka.common.utils.SystemTime; -import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -222,15 +219,16 @@ public class KafkaProducer implements Producer { @Override public Future send(ProducerRecord record, Callback callback) { try { - Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs); + Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs, sender.selector); int partition = partitioner.partition(record, cluster); int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(record.key(), record.value()); ensureValidRecordSize(serializedSize); TopicPartition tp = new TopicPartition(record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); - FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), compressionType, callback); - this.sender.wakeup(); - return future; + Pair futureAndIsWakeUp = accumulator.append(tp, record.key(), record.value(), compressionType, callback); + if (futureAndIsWakeUp.p2) + this.sender.wakeup(); + return futureAndIsWakeUp.p1; // For API exceptions return them in the future; // for other exceptions throw directly } catch (ApiException e) { @@ -262,7 +260,7 @@ public class KafkaProducer implements Producer { } public List partitionsFor(String topic) { - return this.metadata.fetch(topic, this.metadataFetchTimeoutMs).partitionsForTopic(topic); + return this.metadata.fetch(topic, this.metadataFetchTimeoutMs, sender.selector).partitionsForTopic(topic); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index f47a461..fd35933 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -19,6 +19,8 @@ import java.util.Set; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.network.Selectable; +import org.apache.kafka.common.network.Selector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,7 +78,7 @@ public final class Metadata { * @param topic The topic we want metadata for * @param maxWaitMs The maximum amount of time to block waiting for metadata */ - public synchronized Cluster fetch(String topic, long maxWaitMs) { + public synchronized Cluster fetch(String topic, long maxWaitMs, Selectable selector) { List partitions = null; long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; @@ -85,6 +87,8 @@ public final class Metadata { if (partitions == null) { topics.add(topic); forceUpdate = true; + if (selector != null) + selector.wakeup(); try { log.trace("Requesting metadata update for topic {}.", topic); wait(remainingWaitMs); @@ -105,11 +109,11 @@ public final class Metadata { * since our last update and either (1) an update has been requested or (2) the current metadata has expired (more * than metadataExpireMs has passed since the last refresh) */ - public synchronized boolean needsUpdate(long nowMs) { + public synchronized long needsUpdate(long nowMs) { long msSinceLastUpdate = nowMs - this.lastRefreshMs; - boolean updateAllowed = msSinceLastUpdate >= this.refreshBackoffMs; - boolean updateNeeded = this.forceUpdate || msSinceLastUpdate >= this.metadataExpireMs; - return updateAllowed && updateNeeded; + long updateAllowed = Math.max(this.refreshBackoffMs - msSinceLastUpdate, 0); + long updateNeeded = this.forceUpdate ? 0 : this.metadataExpireMs - msSinceLastUpdate; + return Math.max(updateAllowed, updateNeeded); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 4010d42..bde4d90 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.utils.CopyOnWriteMap; +import org.apache.kafka.common.utils.Pair; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; @@ -122,7 +123,7 @@ public final class RecordAccumulator { * @param compression The compression codec for the record * @param callback The user-supplied callback to execute when the request is complete */ - public FutureRecordMetadata append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException { + public Pair append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); // check if we have an in-progress batch @@ -132,7 +133,7 @@ public final class RecordAccumulator { if (batch != null) { FutureRecordMetadata future = batch.tryAppend(key, value, callback); if (future != null) - return future; + return new Pair(future, dq.size() > 1 || batch.records.isFull()); } } @@ -148,15 +149,15 @@ public final class RecordAccumulator { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen // often... free.deallocate(buffer); - return future; + return new Pair(future, dq.size() > 1 || last.records.isFull()); } } - MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression); + MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback)); dq.addLast(batch); - return future; + return new Pair(future, dq.size() > 1 || batch.records.isFull()); } } @@ -185,9 +186,10 @@ public final class RecordAccumulator { *
  • The accumulator has been closed * */ - public Set ready(Cluster cluster, long nowMs) { + public Pair, Long> ready(Cluster cluster, long nowMs) { Set readyNodes = new HashSet(); boolean exhausted = this.free.queued() > 0; + long minLingerMs = lingerMs; for (Map.Entry> entry : this.batches.entrySet()) { TopicPartition part = entry.getKey(); @@ -200,16 +202,22 @@ public final class RecordAccumulator { if (batch != null) { boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; boolean full = deque.size() > 1 || batch.records.isFull(); - boolean expired = nowMs - batch.createdMs >= lingerMs; + long waitedTimeMs = nowMs - batch.createdMs; + boolean expired = waitedTimeMs >= lingerMs; boolean sendable = full || expired || exhausted || closed; if (sendable && !backingOff) readyNodes.add(leader); + long remainingLingerTimeMs = Math.max( + backingOff ? batch.lastAttemptMs + retryBackoffMs - nowMs : lingerMs - waitedTimeMs, + 0); + if (remainingLingerTimeMs < minLingerMs) + minLingerMs = remainingLingerTimeMs; } } } } - return readyNodes; + return new Pair, Long>(readyNodes, minLingerMs); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 4352466..94a0311 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -42,6 +42,7 @@ import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.RequestSend; import org.apache.kafka.common.requests.ResponseHeader; +import org.apache.kafka.common.utils.Pair; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; @@ -62,7 +63,7 @@ public class Sender implements Runnable { private final RecordAccumulator accumulator; /* the selector used to perform network i/o */ - private final Selectable selector; + public final Selectable selector; /* the client id used to identify this client in requests to the server */ private final String clientId; @@ -185,11 +186,13 @@ public class Sender implements Runnable { public void run(long nowMs) { Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send - Set ready = this.accumulator.ready(cluster, nowMs); + Pair, Long> readyAndLingerMs = this.accumulator.ready(cluster, nowMs); + + Set ready = readyAndLingerMs.p1; // should we update our metadata? List sends = new ArrayList(); - maybeUpdateMetadata(cluster, sends, nowMs); + long metadataLingerMs = maybeUpdateMetadata(cluster, sends, nowMs); // prune the list of ready nodes to eliminate any that we aren't ready to send yet Set sendable = processReadyNode(ready, nowMs); @@ -211,9 +214,11 @@ public class Sender implements Runnable { sends.add(request.request); } + long selectTimeMs = sends.size() > 0 ? 0 : Math.min(readyAndLingerMs.p2, metadataLingerMs); + // do the I/O try { - this.selector.poll(100L, sends); + this.selector.poll(selectTimeMs, sends); } catch (IOException e) { log.error("Unexpected error during I/O in producer network thread", e); } @@ -228,13 +233,14 @@ public class Sender implements Runnable { /** * Add a metadata request to the list of sends if we need to make one */ - private void maybeUpdateMetadata(Cluster cluster, List sends, long nowMs) { - if (this.metadataFetchInProgress || !metadata.needsUpdate(nowMs)) - return; + private long maybeUpdateMetadata(Cluster cluster, List sends, long nowMs) { + long metadataLingerMs = metadata.needsUpdate(nowMs); + if (this.metadataFetchInProgress || metadataLingerMs > 0) + return metadataLingerMs; Node node = selectMetadataDestination(cluster); if (node == null) - return; + return metadataLingerMs; if (nodeStates.isConnected(node.id())) { Set topics = metadata.topics(); @@ -247,6 +253,7 @@ public class Sender implements Runnable { // we don't have a connection to this node right now, make one initiateConnect(node, nowMs); } + return metadataLingerMs; } /** @@ -768,6 +775,7 @@ public class Sender implements Runnable { this.batchSizeSensor = metrics.sensor("batch-size"); this.batchSizeSensor.add("batch-size-avg", "The average number of bytes sent per partition per-request.", new Avg()); + this.batchSizeSensor.add("batch-size-max", "The max number of bytes sent per partition per-request.", new Max()); this.compressionRateSensor = metrics.sensor("compression-rate"); this.compressionRateSensor.add("compression-rate-avg", diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 15c9577..9560dea 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.utils.AbstractIterator; + import java.io.DataInputStream; import java.io.EOFException; import java.io.IOException; @@ -23,9 +26,6 @@ import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; import java.util.Iterator; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.utils.AbstractIterator; - /** * A {@link Records} implementation backed by a ByteBuffer. */ @@ -35,9 +35,10 @@ public class MemoryRecords implements Records { private final int capacity; private ByteBuffer buffer; private boolean writable; + private final int idealBatchSize; // Construct a writable memory records - private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable) { + private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int idealBatchSize) { this.writable = writable; this.capacity = buffer.capacity(); if (this.writable) { @@ -47,14 +48,15 @@ public class MemoryRecords implements Records { this.buffer = buffer; this.compressor = null; } + this.idealBatchSize = idealBatchSize; } - public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type) { - return new MemoryRecords(buffer, type, true); + public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type, int idealBatchSize) { + return new MemoryRecords(buffer, type, true, idealBatchSize); } public static MemoryRecords iterableRecords(ByteBuffer buffer) { - return new MemoryRecords(buffer, CompressionType.NONE, false); + return new MemoryRecords(buffer, CompressionType.NONE, false, buffer.capacity()); } /** @@ -95,11 +97,13 @@ public class MemoryRecords implements Records { */ public boolean hasRoomFor(byte[] key, byte[] value) { return this.writable && - this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value); + this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value) && + this.idealBatchSize > this.compressor.estimatedBytesWritten(); } public boolean isFull() { - return !this.writable || this.capacity <= this.compressor.estimatedBytesWritten(); + return !this.writable || this.capacity <= this.compressor.estimatedBytesWritten() || + this.idealBatchSize <= this.compressor.estimatedBytesWritten(); } /** diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Pair.java b/clients/src/main/java/org/apache/kafka/common/utils/Pair.java new file mode 100644 index 0000000..ecde859 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/utils/Pair.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.utils; + +public class Pair { + public final P1 p1; + public final P2 p2; + public Pair(P1 p1, P2 p2) { + this.p1 = p1; + this.p2 = p2; + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java index 8b4ac0f..09ace65 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java @@ -30,11 +30,11 @@ public class MetadataTest { public void testMetadata() throws Exception { long time = 0; metadata.update(Cluster.empty(), time); - assertFalse("No update needed.", metadata.needsUpdate(time)); + assertFalse("No update needed.", metadata.needsUpdate(time) <= 0); metadata.forceUpdate(); - assertFalse("Still no updated needed due to backoff", metadata.needsUpdate(time)); + assertFalse("Still no updated needed due to backoff", metadata.needsUpdate(time) <= 0); time += refreshBackoffMs; - assertTrue("Update needed now that backoff time expired", metadata.needsUpdate(time)); + assertTrue("Update needed now that backoff time expired", metadata.needsUpdate(time) <=0 ); String topic = "my-topic"; Thread t1 = asyncFetch(topic); Thread t2 = asyncFetch(topic); @@ -43,15 +43,15 @@ public class MetadataTest { metadata.update(TestUtils.singletonCluster(topic, 1), time); t1.join(); t2.join(); - assertFalse("No update needed.", metadata.needsUpdate(time)); + assertFalse("No update needed.", metadata.needsUpdate(time) <= 0); time += metadataExpireMs; - assertTrue("Update needed due to stale metadata.", metadata.needsUpdate(time)); + assertTrue("Update needed due to stale metadata.", metadata.needsUpdate(time) <= 0); } private Thread asyncFetch(final String topic) { Thread thread = new Thread() { public void run() { - metadata.fetch(topic, Integer.MAX_VALUE); + metadata.fetch(topic, Integer.MAX_VALUE, null); } }; thread.start(); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java index c4072ae..e1a7ffb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java @@ -57,10 +57,10 @@ public class RecordAccumulatorTest { int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { accum.append(tp1, key, value, CompressionType.NONE, null); - assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).size()); + assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).p1.size()); } accum.append(tp1, key, value, CompressionType.NONE, null); - assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds())); + assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).p1); List batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id()); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); @@ -78,7 +78,7 @@ public class RecordAccumulatorTest { int batchSize = 512; RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time); accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null); - assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds())); + assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).p1); } @Test @@ -86,9 +86,9 @@ public class RecordAccumulatorTest { long lingerMs = 10L; RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time); accum.append(tp1, key, value, CompressionType.NONE, null); - assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).size()); + assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).p1.size()); time.sleep(10); - assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds())); + assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).p1); List batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id()); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); @@ -108,7 +108,7 @@ public class RecordAccumulatorTest { for (int i = 0; i < appends; i++) accum.append(tp, key, value, CompressionType.NONE, null); } - assertEquals("Partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds())); + assertEquals("Partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).p1); List batches = accum.drain(cluster, Collections.singleton(node), 1024, 0).get(node.id()); assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size()); @@ -139,7 +139,7 @@ public class RecordAccumulatorTest { int read = 0; long now = time.milliseconds(); while (read < numThreads * msgs) { - Set nodes = accum.ready(cluster, now); + Set nodes = accum.ready(cluster, now).p1; List batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node.id()); if (batches != null) { for (RecordBatch batch : batches) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java index 3ef692c..73200e3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java @@ -82,7 +82,7 @@ public class SenderTest { @Test public void testSimple() throws Exception { - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).p1; sender.run(time.milliseconds()); assertEquals("We should have connected", 1, selector.connected().size()); selector.clear(); @@ -120,7 +120,7 @@ public class SenderTest { MAX_IN_FLIGHT_REQS, new Metrics(), time); - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).p1; RequestSend request1 = completeSend(sender); selector.clear(); selector.completeReceive(produceResponse(request1.header().correlationId(), @@ -146,7 +146,7 @@ public class SenderTest { @Test public void testMetadataRefreshOnNoLeaderException() throws Exception { - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).p1; RequestSend request = completeSend(); selector.clear(); selector.completeReceive(produceResponse(request.header().correlationId(), @@ -157,18 +157,18 @@ public class SenderTest { Errors.NOT_LEADER_FOR_PARTITION.code())); sender.run(time.milliseconds()); completedWithError(future, Errors.NOT_LEADER_FOR_PARTITION); - assertTrue("Error triggers a metadata update.", metadata.needsUpdate(time.milliseconds())); + assertTrue("Error triggers a metadata update.", metadata.needsUpdate(time.milliseconds()) <= 0); } @Test public void testMetadataRefreshOnDisconnect() throws Exception { - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).p1; completeSend(); selector.clear(); selector.disconnect(cluster.leaderFor(tp).id()); sender.run(time.milliseconds()); completedWithError(future, Errors.NETWORK_EXCEPTION); - assertTrue("The disconnection triggers a metadata update.", metadata.needsUpdate(time.milliseconds())); + assertTrue("The disconnection triggers a metadata update.", metadata.needsUpdate(time.milliseconds()) <= 0); } private void completedWithError(Future future, Errors error) throws Exception { diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index 94a1112..8e56572 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -39,8 +39,8 @@ public class MemoryRecordsTest { @Test public void testIterator() { - MemoryRecords recs1 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression); - MemoryRecords recs2 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression); + MemoryRecords recs1 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression, 1024); + MemoryRecords recs2 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression, 1024); List list = Arrays.asList(new Record("a".getBytes(), "1".getBytes()), new Record("b".getBytes(), "2".getBytes()), new Record("c".getBytes(), "3".getBytes())); diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index e75c4f8..539a648 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -31,6 +31,8 @@ import java.util.concurrent.{TimeUnit, BlockingQueue, ArrayBlockingQueue, CountD import joptsimple.OptionParser import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge +import java.util.Random +import java.util.concurrent.atomic.AtomicInteger object MirrorMaker extends Logging { @@ -136,7 +138,7 @@ object MirrorMaker extends Logging { val mirrorDataChannel = new DataChannel(bufferSize, numConsumers, numProducers) producerThreads = new ListBuffer[ProducerThread]() - var producerIndex: Int = 1 + var producerIndex: Int = 0 for(producer <- producers) { val producerThread = new ProducerThread(mirrorDataChannel, producer, producerIndex) producerThreads += producerThread @@ -186,15 +188,11 @@ object MirrorMaker extends Logging { } class DataChannel(capacity: Int, numProducers: Int, numConsumers: Int) extends KafkaMetricsGroup { - - val queue = new ArrayBlockingQueue[ProducerRecord](capacity) - - newGauge( - "MirrorMaker-DataChannel-Size", - new Gauge[Int] { - def value = queue.size - } - ) + private val channelSizeHist = newHistogram("MirrorMaker-DataChannel-Size") + private val counter = new AtomicInteger(new Random().nextInt()) + val queues = new Array[BlockingQueue[ProducerRecord]](numConsumers) + for (i <- 0 until numConsumers) + queues(i) = new ArrayBlockingQueue[ProducerRecord](capacity) // We use a single meter for aggregated wait percentage for the data channel. // Since meter is calculated as total_recorded_value / time_window and @@ -204,21 +202,32 @@ object MirrorMaker extends Logging { private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.NANOSECONDS) def put(record: ProducerRecord) { + val queueId = + if (record.key() != null) { + Utils.abs(java.util.Arrays.hashCode(record.key())) % numConsumers + } else { + Utils.abs(counter.getAndIncrement()) % numConsumers + } + + val queue = queues(queueId) var putSucceed = false while (!putSucceed) { val startPutTime = SystemTime.nanoseconds putSucceed = queue.offer(record, 500, TimeUnit.MILLISECONDS) waitPut.mark((SystemTime.nanoseconds - startPutTime) / numProducers) } + channelSizeHist.update(queue.size) } - def take(): ProducerRecord = { + def take(queueId: Int): ProducerRecord = { + val queue = queues(queueId) var data: ProducerRecord = null while (data == null) { val startTakeTime = SystemTime.nanoseconds data = queue.poll(500, TimeUnit.MILLISECONDS) waitTake.mark((SystemTime.nanoseconds - startTakeTime) / numConsumers) } + channelSizeHist.update(queue.size) data } } @@ -239,18 +248,8 @@ object MirrorMaker extends Logging { info("Starting mirror maker consumer thread " + threadName) try { for (msgAndMetadata <- stream) { - // If the key of the message is empty, put it into the universal channel - // Otherwise use a pre-assigned producer to send the message - if (msgAndMetadata.key == null) { - trace("Send the non-keyed message the producer channel.") - val data = new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.message) - mirrorDataChannel.put(data) - } else { - val producerId = Utils.abs(java.util.Arrays.hashCode(msgAndMetadata.key)) % producers.size() - trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(msgAndMetadata.key), producerId)) - val producer = producers(producerId) - producer.send(msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message) - } + val data = new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message) + mirrorDataChannel.put(data) } } catch { case e: Throwable => @@ -284,7 +283,7 @@ object MirrorMaker extends Logging { info("Starting mirror maker producer thread " + threadName) try { while (true) { - val data: ProducerRecord = dataChannel.take + val data: ProducerRecord = dataChannel.take(threadId) trace("Sending message with value size %d".format(data.value().size)) if(data eq shutdownMessage) { info("Received shutdown message")