diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 522881c..a287d3f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -116,7 +116,7 @@ public class NetworkClient implements KafkaClient { /** * Check if the node with the given id is ready to send more requests. - * @param nodeId The node id + * @param node The given node id * @param now The current time in ms * @return true if the node is ready */ @@ -126,8 +126,8 @@ public class NetworkClient implements KafkaClient { } private boolean isReady(int node, long now) { - if (this.metadata.needsUpdate(now)) - // if we need to update our metadata declare all requests unready to metadata requests first priority + if (this.metadata.timeToNextUpdate(now) == 0) + // if we need to update our metadata now declare all requests unready to make metadata requests first priority return false; else // otherwise we are ready if we are connected and can send more requests @@ -146,7 +146,7 @@ public class NetworkClient implements KafkaClient { public List poll(List requests, long timeout, long now) { // should we update our metadata? List sends = new ArrayList(); - maybeUpdateMetadata(sends, now); + long metadataTimeout = maybeUpdateMetadata(sends, now); for (int i = 0; i < requests.size(); i++) { ClientRequest request = requests.get(i); @@ -158,9 +158,9 @@ public class NetworkClient implements KafkaClient { sends.add(request.request()); } - // do the I/O + // do the I/O, with the select time to be the minimal of the partition ready timeout and the metadata timeout try { - this.selector.poll(timeout, sends); + this.selector.poll(Math.min(timeout, metadataTimeout), sends); } catch (IOException e) { log.error("Unexpected error during I/O in producer network thread", e); } @@ -340,15 +340,17 @@ public class NetworkClient implements KafkaClient { } /** - * Add a metadata request to the list of sends if we need to make one + * Add a metadata request to the list of sends if we need to make one, + * and return the time we can wait for the next time to update metadata */ - private void maybeUpdateMetadata(List sends, long now) { - if (this.metadataFetchInProgress || !metadata.needsUpdate(now)) - return; + private long maybeUpdateMetadata(List sends, long now) { + long timeToNextUpdate = metadata.timeToNextUpdate(now); + if (this.metadataFetchInProgress || timeToNextUpdate > 0) + return timeToNextUpdate; Node node = this.leastLoadedNode(now); if (node == null) - return; + return timeToNextUpdate; if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { Set topics = metadata.topics(); @@ -361,6 +363,7 @@ public class NetworkClient implements KafkaClient { // we don't have a connection to this node right now, make one initiateConnect(node, now); } + return timeToNextUpdate; } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 00775ab..ac0fb61 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -22,7 +22,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.NetworkClient; -import org.apache.kafka.clients.producer.internals.FutureRecordMetadata; import org.apache.kafka.clients.producer.internals.Metadata; import org.apache.kafka.clients.producer.internals.Partitioner; import org.apache.kafka.clients.producer.internals.RecordAccumulator; @@ -232,10 +231,12 @@ public class KafkaProducer implements Producer { ensureValidRecordSize(serializedSize); TopicPartition tp = new TopicPartition(record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); - FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), compressionType, callback); - this.sender.wakeup(); - return future; - // For API exceptions return them in the future; + RecordAccumulator.RecordAppendResult result = accumulator.append(tp, record.key(), record.value(), compressionType, callback); + if (result.needWakeupSender) + this.sender.wakeup(); + return result.future; + // Handling exceptions and record the errors; + // For API exceptions return them in the future, // for other exceptions throw directly } catch (ApiException e) { log.debug("Exception occurred during message send:", e); @@ -246,6 +247,9 @@ public class KafkaProducer implements Producer { } catch (InterruptedException e) { this.errors.record(); throw new KafkaException(e); + } catch (KafkaException e) { + this.errors.record(); + throw e; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index 57bc285..8890aa2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -101,15 +101,14 @@ public final class Metadata { } /** - * Does the current cluster info need to be updated? An update is needed if it has been at least refreshBackoffMs - * since our last update and either (1) an update has been requested or (2) the current metadata has expired (more - * than metadataExpireMs has passed since the last refresh) + * The next time to update the cluster info is the maximum of the time the current info will expire + * and the time the current info can be updated (i.e. backoff time has elapsed); If an update has + * been request then the expiry time is now */ - public synchronized boolean needsUpdate(long now) { - long msSinceLastUpdate = now - this.lastRefreshMs; - boolean updateAllowed = msSinceLastUpdate >= this.refreshBackoffMs; - boolean updateNeeded = this.forceUpdate || msSinceLastUpdate >= this.metadataExpireMs; - return updateAllowed && updateNeeded; + public synchronized long timeToNextUpdate(long nowMs) { + long timeToExpire = forceUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0); + long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs; + return Math.max(timeToExpire, timeToAllowUpdate); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 1ed3c28..711bb3b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -51,6 +51,28 @@ import org.slf4j.LoggerFactory; */ public final class RecordAccumulator { + public class RecordAppendResult { + public FutureRecordMetadata future; + public boolean needWakeupSender; + + public RecordAppendResult(FutureRecordMetadata future, boolean needWakeupSender) { + this.future = future; + this.needWakeupSender = needWakeupSender; + } + } + + public class ReadyCheckResult { + public Set readyNodes; + public long timeToExpireMs; + + public ReadyCheckResult(Set readyNodes, long timeToExpireMs) { + this.readyNodes = readyNodes; + this.timeToExpireMs = timeToExpireMs; + } + } + + public static final long DEFAULT_PARTITION_EXPIRY_MS = 100L; + private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class); private volatile boolean closed; @@ -120,27 +142,33 @@ public final class RecordAccumulator { } /** - * Add a record to the accumulator. + * Add a record to the accumulator, return the future metadata and indicate whether the appended partition is ready now. *

* This method will block if sufficient memory isn't available for the record unless blocking has been disabled. - * + *

+ * An appended partition is ready if + *

    + *
  1. The current batch's record set is full + *
  2. A new batch has been created for this new message append and the linger time is smaller than the default select timeout + *
+ * * @param tp The topic/partition to which this record is being sent * @param key The key for the record * @param value The value for the record * @param compression The compression codec for the record * @param callback The user-supplied callback to execute when the request is complete */ - public FutureRecordMetadata append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException { + public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); // check if we have an in-progress batch Deque dq = dequeFor(tp); synchronized (dq) { - RecordBatch batch = dq.peekLast(); - if (batch != null) { - FutureRecordMetadata future = batch.tryAppend(key, value, callback); + RecordBatch last = dq.peekLast(); + if (last != null) { + FutureRecordMetadata future = last.tryAppend(key, value, callback); if (future != null) - return future; + return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull()); } } @@ -156,15 +184,15 @@ public final class RecordAccumulator { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen // often... free.deallocate(buffer); - return future; + return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull()); } } - MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression); + MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback)); dq.addLast(batch); - return future; + return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull() || this.lingerMs < DEFAULT_PARTITION_EXPIRY_MS); } } @@ -181,7 +209,8 @@ public final class RecordAccumulator { } /** - * Get a list of nodes whose partitions are ready to be sent. + * Get a list of nodes whose partitions are ready to be sent, and the time to when any partition will be ready if no + * partitions are ready yet; If the ready nodes list is non-empty, the timeout value will be 0. *

* A destination node is ready to send data if ANY one of its partition is not backing off the send and ANY of the * following are true : @@ -193,9 +222,10 @@ public final class RecordAccumulator { *

  • The accumulator has been closed * */ - public Set ready(Cluster cluster, long now) { + public ReadyCheckResult ready(Cluster cluster, long nowMs) { Set readyNodes = new HashSet(); boolean exhausted = this.free.queued() > 0; + long timeToExpireMs = DEFAULT_PARTITION_EXPIRY_MS; for (Map.Entry> entry : this.batches.entrySet()) { TopicPartition part = entry.getKey(); @@ -206,18 +236,22 @@ public final class RecordAccumulator { synchronized (deque) { RecordBatch batch = deque.peekFirst(); if (batch != null) { - boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > now; + boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; + long waitedTimeMs = nowMs - batch.lastAttemptMs; + long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; + long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); boolean full = deque.size() > 1 || batch.records.isFull(); - boolean expired = now - batch.createdMs >= lingerMs; + boolean expired = waitedTimeMs >= lingerMs; boolean sendable = full || expired || exhausted || closed; if (sendable && !backingOff) readyNodes.add(leader); + timeToExpireMs = Math.min(timeLeftMs, timeToExpireMs); } } } } - return readyNodes; + return new ReadyCheckResult(readyNodes, timeToExpireMs); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 6fb5b82..665d910 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -144,10 +144,10 @@ public class Sender implements Runnable { public void run(long now) { Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send - Set ready = this.accumulator.ready(cluster, now); + RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); // remove any nodes we aren't ready to send to - Iterator iter = ready.iterator(); + Iterator iter = result.readyNodes.iterator(); while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) @@ -155,16 +155,16 @@ public class Sender implements Runnable { } // create produce requests - Map> batches = this.accumulator.drain(cluster, ready, this.maxRequestSize, now); + Map> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); List requests = createProduceRequests(batches, now); sensors.updateProduceRequestMetrics(requests); - if (ready.size() > 0) { - log.trace("Nodes with data ready to send: {}", ready); + if (result.readyNodes.size() > 0) { + log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); } - List responses = this.client.poll(requests, 100L, now); + List responses = this.client.poll(requests, result.timeToExpireMs, now); for (ClientResponse response : responses) { if (response.wasDisconnected()) handleDisconnect(response, now); @@ -307,6 +307,7 @@ public class Sender implements Runnable { this.batchSizeSensor = metrics.sensor("batch-size"); this.batchSizeSensor.add("batch-size-avg", "The average number of bytes sent per partition per-request.", new Avg()); + this.batchSizeSensor.add("batch-size-max", "The max number of bytes sent per partition per-request.", new Max()); this.compressionRateSensor = metrics.sensor("compression-rate"); this.compressionRateSensor.add("compression-rate-avg", "The average compression rate of record batches.", new Avg()); diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 759f577..040e5b9 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -29,13 +29,15 @@ public class MemoryRecords implements Records { private final Compressor compressor; private final int capacity; + private final int sizeLimit; private ByteBuffer buffer; private boolean writable; // Construct a writable memory records - private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable) { + private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int sizeLimit) { this.writable = writable; this.capacity = buffer.capacity(); + this.sizeLimit = sizeLimit; if (this.writable) { this.buffer = null; this.compressor = new Compressor(buffer, type); @@ -45,12 +47,16 @@ public class MemoryRecords implements Records { } } + public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type, int capacity) { + return new MemoryRecords(buffer, type, true, capacity); + } + public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type) { - return new MemoryRecords(buffer, type, true); + return emptyRecords(buffer, type, buffer.capacity()); } public static MemoryRecords iterableRecords(ByteBuffer buffer) { - return new MemoryRecords(buffer, CompressionType.NONE, false); + return new MemoryRecords(buffer, CompressionType.NONE, false, buffer.capacity()); } /** @@ -88,14 +94,22 @@ public class MemoryRecords implements Records { * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be * accurate if compression is really used. When this happens, the following append may cause dynamic buffer * re-allocation in the underlying byte buffer stream. + * + * Also note that besides the records' capacity, there is also a size limit for the batch. This size limit may be + * smaller than the capacity (e.g. when appending a single message whose size is larger than the batch size, the + * capacity will be the message size, but the size limit will still be the batch size), and when the records' size has + * exceed this limit we also mark this record as full. */ public boolean hasRoomFor(byte[] key, byte[] value) { - return this.writable && this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + - Record.recordSize(key, value); + return this.writable && + this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value) && + this.sizeLimit >= this.compressor.estimatedBytesWritten(); } public boolean isFull() { - return !this.writable || this.capacity <= this.compressor.estimatedBytesWritten(); + return !this.writable || + this.capacity <= this.compressor.estimatedBytesWritten() || + this.sizeLimit <= this.compressor.estimatedBytesWritten(); } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 6a3cdcc..2f98192 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -54,7 +54,7 @@ public class NetworkClientTest { client.poll(reqs, 1, time.milliseconds()); selector.clear(); assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node, time.milliseconds())); - assertTrue("Metadata should get updated.", metadata.needsUpdate(time.milliseconds())); + assertTrue("Metadata should get updated.", metadata.timeToNextUpdate(time.milliseconds()) == 0); } @Test(expected = IllegalStateException.class) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java index 8b4ac0f..0d7d04c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java @@ -30,11 +30,11 @@ public class MetadataTest { public void testMetadata() throws Exception { long time = 0; metadata.update(Cluster.empty(), time); - assertFalse("No update needed.", metadata.needsUpdate(time)); + assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); metadata.forceUpdate(); - assertFalse("Still no updated needed due to backoff", metadata.needsUpdate(time)); + assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0); time += refreshBackoffMs; - assertTrue("Update needed now that backoff time expired", metadata.needsUpdate(time)); + assertTrue("Update needed now that backoff time expired", metadata.timeToNextUpdate(time) == 0); String topic = "my-topic"; Thread t1 = asyncFetch(topic); Thread t2 = asyncFetch(topic); @@ -43,9 +43,9 @@ public class MetadataTest { metadata.update(TestUtils.singletonCluster(topic, 1), time); t1.join(); t2.join(); - assertFalse("No update needed.", metadata.needsUpdate(time)); + assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); time += metadataExpireMs; - assertTrue("Update needed due to stale metadata.", metadata.needsUpdate(time)); + assertTrue("Update needed due to stale metadata.", metadata.timeToNextUpdate(time) == 0); } private Thread asyncFetch(final String topic) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java index 93b58d0..0762b35 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java @@ -62,10 +62,10 @@ public class RecordAccumulatorTest { int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { accum.append(tp1, key, value, CompressionType.NONE, null); - assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).size()); + assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); } accum.append(tp1, key, value, CompressionType.NONE, null); - assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds())); + assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes); List batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id()); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); @@ -83,7 +83,7 @@ public class RecordAccumulatorTest { int batchSize = 512; RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time); accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null); - assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds())); + assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes); } @Test @@ -91,9 +91,9 @@ public class RecordAccumulatorTest { long lingerMs = 10L; RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time); accum.append(tp1, key, value, CompressionType.NONE, null); - assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).size()); + assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); - assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds())); + assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes); List batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id()); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); @@ -113,7 +113,7 @@ public class RecordAccumulatorTest { for (int i = 0; i < appends; i++) accum.append(tp, key, value, CompressionType.NONE, null); } - assertEquals("Partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds())); + assertEquals("Partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes); List batches = accum.drain(cluster, Collections.singleton(node), 1024, 0).get(node.id()); assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size()); @@ -144,7 +144,7 @@ public class RecordAccumulatorTest { int read = 0; long now = time.milliseconds(); while (read < numThreads * msgs) { - Set nodes = accum.ready(cluster, now); + Set nodes = accum.ready(cluster, now).readyNodes; List batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node.id()); if (batches != null) { for (RecordBatch batch : batches) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java index 5489aca..ef2ca65 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java @@ -69,7 +69,7 @@ public class SenderTest { @Test public void testSimple() throws Exception { int offset = 0; - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount()); @@ -95,7 +95,7 @@ public class SenderTest { new Metrics(), time); // do a successful retry - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals(1, client.inFlightRequestCount()); @@ -112,7 +112,7 @@ public class SenderTest { assertEquals(offset, future.get().offset()); // do an unsuccessful retry - future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); + future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; sender.run(time.milliseconds()); // send produce request for (int i = 0; i < maxRetries + 1; i++) { client.disconnect(client.requests().peek().request().destination()); diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 7638391..4f06e34 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -21,22 +21,22 @@ import kafka.utils.{SystemTime, Utils, CommandLineUtils, Logging} import kafka.consumer._ import kafka.serializer._ import kafka.producer.{OldProducer, NewShinyProducer, BaseProducer} +import kafka.metrics.KafkaMetricsGroup + import org.apache.kafka.clients.producer.ProducerRecord -import scala.collection.mutable.ListBuffer import scala.collection.JavaConversions._ -import java.util.concurrent.{TimeUnit, BlockingQueue, ArrayBlockingQueue, CountDownLatch} - import joptsimple.OptionParser -import kafka.metrics.KafkaMetricsGroup -import com.yammer.metrics.core.Gauge +import java.util.Random +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.{TimeUnit, BlockingQueue, ArrayBlockingQueue, CountDownLatch} object MirrorMaker extends Logging { private var connectors: Seq[ZookeeperConsumerConnector] = null private var consumerThreads: Seq[ConsumerThread] = null - private var producerThreads: ListBuffer[ProducerThread] = null + private var producerThreads: Seq[ProducerThread] = null private val shutdownMessage : ProducerRecord = new ProducerRecord("shutdown", "shutdown".getBytes) @@ -138,13 +138,7 @@ object MirrorMaker extends Logging { // create a data channel btw the consumers and the producers val mirrorDataChannel = new DataChannel(bufferSize, numConsumers, numProducers) - producerThreads = new ListBuffer[ProducerThread]() - var producerIndex: Int = 1 - for(producer <- producers) { - val producerThread = new ProducerThread(mirrorDataChannel, producer, producerIndex) - producerThreads += producerThread - producerIndex += 1 - } + producerThreads = producers.zipWithIndex.map(producerAndIndex => new ProducerThread(mirrorDataChannel, producerAndIndex._1, producerAndIndex._2)) val filterSpec = if (options.has(whitelistOpt)) new Whitelist(options.valueOf(whitelistOpt)) @@ -190,14 +184,11 @@ object MirrorMaker extends Logging { class DataChannel(capacity: Int, numProducers: Int, numConsumers: Int) extends KafkaMetricsGroup { - val queue = new ArrayBlockingQueue[ProducerRecord](capacity) + val queues = new Array[BlockingQueue[ProducerRecord]](numConsumers) + for (i <- 0 until numConsumers) + queues(i) = new ArrayBlockingQueue[ProducerRecord](capacity) - newGauge( - "MirrorMaker-DataChannel-Size", - new Gauge[Int] { - def value = queue.size - } - ) + private val counter = new AtomicInteger(new Random().nextInt()) // We use a single meter for aggregated wait percentage for the data channel. // Since meter is calculated as total_recorded_value / time_window and @@ -205,23 +196,37 @@ object MirrorMaker extends Logging { // time should be discounted by # threads. private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", "percent", TimeUnit.NANOSECONDS) private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.NANOSECONDS) + private val channelSizeHist = newHistogram("MirrorMaker-DataChannel-Size") def put(record: ProducerRecord) { + // If the key of the message is empty, use round-robin to select the queue + // Otherwise use the queue based on the key value so that same key-ed messages go to the same queue + val queueId = + if(record.key() != null) { + Utils.abs(java.util.Arrays.hashCode(record.key())) % numConsumers + } else { + Utils.abs(counter.getAndIncrement()) % numConsumers + } + val queue = queues(queueId) + var putSucceed = false while (!putSucceed) { val startPutTime = SystemTime.nanoseconds putSucceed = queue.offer(record, 500, TimeUnit.MILLISECONDS) waitPut.mark((SystemTime.nanoseconds - startPutTime) / numProducers) } + channelSizeHist.update(queue.size) } - def take(): ProducerRecord = { + def take(queueId: Int): ProducerRecord = { + val queue = queues(queueId) var data: ProducerRecord = null while (data == null) { val startTakeTime = SystemTime.nanoseconds data = queue.poll(500, TimeUnit.MILLISECONDS) waitTake.mark((SystemTime.nanoseconds - startTakeTime) / numConsumers) } + channelSizeHist.update(queue.size) data } } @@ -242,18 +247,8 @@ object MirrorMaker extends Logging { info("Starting mirror maker consumer thread " + threadName) try { for (msgAndMetadata <- stream) { - // If the key of the message is empty, put it into the universal channel - // Otherwise use a pre-assigned producer to send the message - if (msgAndMetadata.key == null) { - trace("Send the non-keyed message the producer channel.") - val data = new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.message) - mirrorDataChannel.put(data) - } else { - val producerId = Utils.abs(java.util.Arrays.hashCode(msgAndMetadata.key)) % producers.size() - trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(msgAndMetadata.key), producerId)) - val producer = producers(producerId) - producer.send(msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message) - } + val data = new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.message) + mirrorDataChannel.put(data) } } catch { case e: Throwable => @@ -287,7 +282,7 @@ object MirrorMaker extends Logging { info("Starting mirror maker producer thread " + threadName) try { while (true) { - val data: ProducerRecord = dataChannel.take + val data: ProducerRecord = dataChannel.take(threadId) trace("Sending message with value size %d".format(data.value().size)) if(data eq shutdownMessage) { info("Received shutdown message")