diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index a6423f4..90cacbd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -283,7 +283,7 @@ public class KafkaProducer implements Producer { } public List partitionsFor(String topic) { - return this.metadata.fetch(topic, this.metadataFetchTimeoutMs).partitionsFor(topic); + return this.metadata.fetch(topic, this.metadataFetchTimeoutMs).partitionsForTopic(topic); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 6a0f3b2..c0f1d57 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -102,7 +102,7 @@ public class MockProducer implements Producer { @Override public synchronized Future send(ProducerRecord record, Callback callback) { int partition = 0; - if (this.cluster.partitionsFor(record.topic()) != null) + if (this.cluster.partitionsForTopic(record.topic()) != null) partition = partitioner.partition(record, this.cluster); ProduceRequestResult result = new ProduceRequestResult(); FutureRecordMetadata future = new FutureRecordMetadata(result, 0); @@ -133,7 +133,7 @@ public class MockProducer implements Producer { } public List partitionsFor(String topic) { - return this.cluster.partitionsFor(topic); + return this.cluster.partitionsForTopic(topic); } public Map metrics() { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index f114ffd..f47a461 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -81,7 +81,7 @@ public final class Metadata { long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; do { - partitions = cluster.partitionsFor(topic); + partitions = cluster.partitionsForTopic(topic); if (partitions == null) { topics.add(topic); forceUpdate = true; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java index fbb732a..40e8234 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java @@ -41,10 +41,10 @@ public class Partitioner { * Compute the partition for the given record. * * @param record The record being sent - * @param numPartitions The total number of partitions for the given topic + * @param cluster The current cluster metadata */ public int partition(ProducerRecord record, Cluster cluster) { - List partitions = cluster.partitionsFor(record.topic()); + List partitions = cluster.partitionsForTopic(record.topic()); int numPartitions = partitions.size(); if (record.partition() != null) { // they have given us a partition, use it diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 2d7e52d..77b0d32 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -13,15 +13,13 @@ package org.apache.kafka.clients.producer.internals; import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Deque; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentMap; import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; @@ -48,6 +46,7 @@ public final class RecordAccumulator { private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class); private volatile boolean closed; + private volatile int readyPartitions; private int drainIndex; private final int batchSize; private final long lingerMs; @@ -79,6 +78,7 @@ public final class RecordAccumulator { Metrics metrics, Time time) { this.drainIndex = 0; + this.readyPartitions = 0; this.closed = false; this.batchSize = batchSize; this.lingerMs = lingerMs; @@ -113,7 +113,7 @@ public final class RecordAccumulator { }); metrics.addMetric("ready-partitions", "The number of topic-partitions with buffered data ready to be sent.", new Measurable() { public double measure(MetricConfig config, long nowMs) { - return ready(nowMs).size(); + return readyPartitions; } }); } @@ -180,9 +180,10 @@ public final class RecordAccumulator { } /** - * Get a list of topic-partitions which are ready to be sent. + * Get a list of nodes whose partitions are ready to be sent. *

- * A partition is ready if ANY of the following are true: + * A destination node is ready to send data if ANY one of its partition is not backing off the send + * and ANY of the following are true : *

    *
  1. The record set is full *
  2. The record set has sat in the accumulator for at least lingerMs milliseconds @@ -191,24 +192,37 @@ public final class RecordAccumulator { *
  3. The accumulator has been closed *
*/ - public List ready(long nowMs) { - List ready = new ArrayList(); + public List ready(Cluster cluster, long nowMs) { + List readyNodes = new ArrayList(); + int readyPartitions = 0; boolean exhausted = this.free.queued() > 0; + for (Map.Entry> entry : this.batches.entrySet()) { + TopicPartition part = entry.getKey(); Deque deque = entry.getValue(); - synchronized (deque) { - RecordBatch batch = deque.peekFirst(); - if (batch != null) { - boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; - boolean full = deque.size() > 1 || batch.records.isFull(); - boolean expired = nowMs - batch.createdMs >= lingerMs; - boolean sendable = full || expired || exhausted || closed; - if (sendable && !backingOff) - ready.add(batch.topicPartition); + // if the leader is unknown use an Unknown node placeholder + Node leader = cluster.leaderFor(part); + if (leader == null) leader = Node.Unknown; + if (!readyNodes.contains(leader)) { + synchronized (deque) { + RecordBatch batch = deque.peekFirst(); + if (batch != null) { + boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; + boolean full = deque.size() > 1 || batch.records.isFull(); + boolean expired = nowMs - batch.createdMs >= lingerMs; + boolean sendable = full || expired || exhausted || closed; + if (sendable && !backingOff) { + readyNodes.add(leader); + if (!leader.equals(Node.Unknown)) + readyPartitions += cluster.partitionsForNode(leader.id()).size(); + } + } } } } - return ready; + this.readyPartitions = readyPartitions; + + return readyNodes; } /** @@ -226,45 +240,55 @@ public final class RecordAccumulator { } /** - * Drain all the data for the given topic-partitions that will fit within the specified size. This method attempts - * to avoid choosing the same topic-partitions over and over. + * Drain all the data for the given nodes and collate them into a list of + * batches that will fit within the specified size on a per-node basis. + * This method attempts to avoid choosing the same topic-node over and over. * - * @param partitions The list of partitions to drain + * @param cluster The current cluster metadata + * @param nodes The list of node to drain * @param maxSize The maximum number of bytes to drain * @param nowMs The current unix time in milliseconds - * @return A list of {@link RecordBatch} for partitions specified with total size less than the requested maxSize. + * @return A list of {@link RecordBatch} for each node specified with total size less than the requested maxSize. * TODO: There may be a starvation issue due to iteration order */ - public List drain(List partitions, int maxSize, long nowMs) { - if (partitions.isEmpty()) - return Collections.emptyList(); - int size = 0; - List ready = new ArrayList(); - /* to make starvation less likely this loop doesn't start at 0 */ - int start = drainIndex = drainIndex % partitions.size(); - do { - TopicPartition tp = partitions.get(drainIndex); - Deque deque = dequeFor(tp); - if (deque != null) { - synchronized (deque) { - RecordBatch first = deque.peekFirst(); - if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { - // there is a rare case that a single batch size is larger than the request size due - // to compression; in this case we will still eventually send this batch in a single - // request - return ready; - } else { - RecordBatch batch = deque.pollFirst(); - batch.records.close(); - size += batch.records.sizeInBytes(); - ready.add(batch); - batch.drainedMs = nowMs; + public Map> drain(Cluster cluster, List nodes, int maxSize, long nowMs) { + if (nodes.isEmpty()) + return Collections.emptyMap(); + + Map> batches = new HashMap>(); + for (Node node : nodes) { + int size = 0; + List parts = cluster.partitionsForNode(node.id()); + List ready = new ArrayList(); + /* to make starvation less likely this loop doesn't start at 0 */ + int start = drainIndex = drainIndex % parts.size(); + do { + PartitionInfo part = parts.get(drainIndex); + Deque deque = dequeFor(new TopicPartition(part.topic(), part.partition())); + if (deque != null) { + synchronized (deque) { + RecordBatch first = deque.peekFirst(); + if (first != null) { + if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { + // there is a rare case that a single batch size is larger than the request size due + // to compression; in this case we will still eventually send this batch in a single + // request + break; + } else { + RecordBatch batch = deque.pollFirst(); + batch.records.close(); + size += batch.records.sizeInBytes(); + ready.add(batch); + batch.drainedMs = nowMs; + } + } } } - } - this.drainIndex = (this.drainIndex + 1) % partitions.size(); - } while (start != drainIndex); - return ready; + this.drainIndex = (this.drainIndex + 1) % parts.size(); + } while (start != drainIndex); + batches.put(node.id(), ready); + } + return batches; } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index f0152fa..8002bb3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -192,18 +192,18 @@ public class Sender implements Runnable { public void run(long nowMs) { Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send - List ready = this.accumulator.ready(nowMs); + List ready = this.accumulator.ready(cluster, nowMs); // should we update our metadata? List sends = new ArrayList(); maybeUpdateMetadata(cluster, sends, nowMs); - // prune the list of ready topics to eliminate any that we aren't ready to send yet - List sendable = processReadyPartitions(cluster, ready, nowMs); + // prune the list of ready nodes to eliminate any that we aren't ready to send yet + List sendable = processReadyNode(ready, nowMs); // create produce requests - List batches = this.accumulator.drain(sendable, this.maxRequestSize, nowMs); - List requests = collate(cluster, batches, nowMs); + Map> batches = this.accumulator.drain(cluster, sendable, this.maxRequestSize, nowMs); + List requests = generateProduceRequests(batches, nowMs); sensors.updateProduceRequestMetrics(requests); if (ready.size() > 0) { @@ -310,19 +310,21 @@ public class Sender implements Runnable { } /** - * Process the set of topic-partitions with data ready to send. If we have a connection to the appropriate node, add - * it to the returned set. For any partitions we have no connection to either make one, fetch the appropriate - * metadata to be able to do so + * Process the set of destination nodes with data ready to send. + * + * 1) If we have an unknown leader node, force refresh the metadata. + * 2) If we have a connection to the appropriate node, add + * it to the returned set; + * 3) If we have not a connection yet, initialize one */ - private List processReadyPartitions(Cluster cluster, List ready, long nowMs) { - List sendable = new ArrayList(ready.size()); - for (TopicPartition tp : ready) { - Node node = cluster.leaderFor(tp); - if (node == null) { + private List processReadyNode(List ready, long nowMs) { + List sendable = new ArrayList(ready.size()); + for (Node node : ready) { + if (node.equals(Node.Unknown)) { // we don't know about this topic/partition or it has no leader, re-fetch metadata metadata.forceUpdate(); } else if (nodeStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { - sendable.add(tp); + sendable.add(node); } else if (nodeStates.canConnect(node.id(), nowMs)) { // we don't have a connection to this node right now, make one initiateConnect(node, nowMs); @@ -520,19 +522,9 @@ public class Sender implements Runnable { } /** - * Collate the record batches into a list of produce requests on a per-node basis + * Transfer the record batches into a list of produce requests on a per-node basis */ - private List collate(Cluster cluster, List batches, long nowMs) { - Map> collated = new HashMap>(); - for (RecordBatch batch : batches) { - Node node = cluster.leaderFor(batch.topicPartition); - List found = collated.get(node.id()); - if (found == null) { - found = new ArrayList(); - collated.put(node.id(), found); - } - found.add(batch); - } + private List generateProduceRequests(Map> collated, long nowMs) { List requests = new ArrayList(collated.size()); for (Map.Entry> entry : collated.entrySet()) requests.add(produceRequest(nowMs, entry.getKey(), acks, requestTimeout, entry.getValue())); diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java index 426bd1e..c62707a 100644 --- a/clients/src/main/java/org/apache/kafka/common/Cluster.java +++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java @@ -12,6 +12,8 @@ */ package org.apache.kafka.common; +import org.apache.kafka.common.utils.Utils; + import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; @@ -28,6 +30,7 @@ public final class Cluster { private final List nodes; private final Map partitionsByTopicPartition; private final Map> partitionsByTopic; + private final Map> partitionsByNode; /** * Create a new cluster with the given nodes and partitions @@ -45,18 +48,32 @@ public final class Cluster { for (PartitionInfo p : partitions) this.partitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p); - // index the partitions by topic and make the lists unmodifiable so we can handle them out in - // user-facing apis without risk of the client modifying the contents - HashMap> parts = new HashMap>(); + // index the partitions by topic and node respectively, and make the lists + // unmodifiable so we can hand them out in user-facing apis without risk + // of the client modifying the contents + HashMap> partsForTopic = new HashMap>(); + HashMap> partsForNode = new HashMap>(); + for (Node n : this.nodes) { + partsForNode.put(n.id(), new ArrayList()); + } for (PartitionInfo p : partitions) { - if (!parts.containsKey(p.topic())) - parts.put(p.topic(), new ArrayList()); - List ps = parts.get(p.topic()); - ps.add(p); + if (!partsForTopic.containsKey(p.topic())) + partsForTopic.put(p.topic(), new ArrayList()); + List psTopic = partsForTopic.get(p.topic()); + psTopic.add(p); + + if (p.leader() != null) { + List psNode = Utils.notNull(partsForNode.get(p.leader().id())); + psNode.add(p); + } } - this.partitionsByTopic = new HashMap>(parts.size()); - for (Map.Entry> entry : parts.entrySet()) + this.partitionsByTopic = new HashMap>(partsForTopic.size()); + for (Map.Entry> entry : partsForTopic.entrySet()) this.partitionsByTopic.put(entry.getKey(), Collections.unmodifiableList(entry.getValue())); + this.partitionsByNode = new HashMap>(partsForNode.size()); + for (Map.Entry> entry : partsForNode.entrySet()) + this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue())); + } /** @@ -113,10 +130,19 @@ public final class Cluster { * @param topic The topic name * @return A list of partitions */ - public List partitionsFor(String topic) { + public List partitionsForTopic(String topic) { return this.partitionsByTopic.get(topic); } + /** + * Get the list of partitions whose leader is this node + * @param nodeId The node id + * @return A list of partitions + */ + public List partitionsForNode(int nodeId) { + return this.partitionsByNode.get(nodeId); + } + @Override public String toString() { return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")"; diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java b/clients/src/main/java/org/apache/kafka/common/Node.java index 0e47ff3..d21aee4 100644 --- a/clients/src/main/java/org/apache/kafka/common/Node.java +++ b/clients/src/main/java/org/apache/kafka/common/Node.java @@ -28,6 +28,8 @@ public class Node { this.port = port; } + public static final Node Unknown = new Node(-1, null, -1); + /** * The node id of this node */ diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java index f37ab77..cbccecc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java @@ -17,12 +17,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; +import java.util.*; import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.RecordBatch; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.CompressionType; @@ -34,11 +35,19 @@ import org.junit.Test; public class RecordAccumulatorTest { - private TopicPartition tp = new TopicPartition("test", 0); + private String topic = "test"; + private int partition1 = 0; + private int partition2 = 1; + private Node node = new Node(0, "localhost", 1111); + private TopicPartition tp1 = new TopicPartition(topic, partition1); + private TopicPartition tp2 = new TopicPartition(topic, partition2); + private PartitionInfo part1 = new PartitionInfo(topic, partition1, node, null, null); + private PartitionInfo part2 = new PartitionInfo(topic, partition2, node, null, null); private MockTime time = new MockTime(); private byte[] key = "key".getBytes(); private byte[] value = "value".getBytes(); private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value); + private Cluster cluster = new Cluster(Collections.singleton(node), Arrays.asList(part1, part2)); private Metrics metrics = new Metrics(time); @Test @@ -47,12 +56,12 @@ public class RecordAccumulatorTest { RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time); int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { - accum.append(tp, key, value, CompressionType.NONE, null); - assertEquals("No partitions should be ready.", 0, accum.ready(now).size()); + accum.append(tp1, key, value, CompressionType.NONE, null); + assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).size()); } - accum.append(tp, key, value, CompressionType.NONE, null); - assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds())); - List batches = accum.drain(asList(tp), Integer.MAX_VALUE, 0); + accum.append(tp1, key, value, CompressionType.NONE, null); + assertEquals("Our partition's leader should be ready", asList(node), accum.ready(cluster, time.milliseconds())); + List batches = accum.drain(cluster, asList(node), Integer.MAX_VALUE, 0).get(node.id()); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); Iterator iter = batch.records.iterator(); @@ -68,19 +77,19 @@ public class RecordAccumulatorTest { public void testAppendLarge() throws Exception { int batchSize = 512; RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time); - accum.append(tp, key, new byte[2 * batchSize], CompressionType.NONE, null); - assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds())); + accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null); + assertEquals("Our partition's leader should be ready", asList(node), accum.ready(cluster, time.milliseconds())); } @Test public void testLinger() throws Exception { long lingerMs = 10L; RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time); - accum.append(tp, key, value, CompressionType.NONE, null); - assertEquals("No partitions should be ready", 0, accum.ready(time.milliseconds()).size()); + accum.append(tp1, key, value, CompressionType.NONE, null); + assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).size()); time.sleep(10); - assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds())); - List batches = accum.drain(asList(tp), Integer.MAX_VALUE, 0); + assertEquals("Our partition's leader should be ready", asList(node), accum.ready(cluster, time.milliseconds())); + List batches = accum.drain(cluster, asList(node), Integer.MAX_VALUE, 0).get(node.id()); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); Iterator iter = batch.records.iterator(); @@ -94,14 +103,14 @@ public class RecordAccumulatorTest { public void testPartialDrain() throws Exception { RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time); int appends = 1024 / msgSize + 1; - List partitions = asList(new TopicPartition("test", 0), new TopicPartition("test", 1)); + List partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { for (int i = 0; i < appends; i++) accum.append(tp, key, value, CompressionType.NONE, null); } - assertEquals("Both partitions should be ready", 2, accum.ready(time.milliseconds()).size()); + assertEquals("Partition's leader should be ready", asList(node), accum.ready(cluster, time.milliseconds())); - List batches = accum.drain(partitions, 1024, 0); + List batches = accum.drain(cluster, asList(node), 1024, 0).get(node.id()); assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size()); } @@ -109,7 +118,7 @@ public class RecordAccumulatorTest { public void testStressfulSituation() throws Exception { final int numThreads = 5; final int msgs = 10000; - final int numParts = 10; + final int numParts = 2; final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, true, metrics, time); List threads = new ArrayList(); for (int i = 0; i < numThreads; i++) { @@ -117,7 +126,7 @@ public class RecordAccumulatorTest { public void run() { for (int i = 0; i < msgs; i++) { try { - accum.append(new TopicPartition("test", i % numParts), key, value, CompressionType.NONE, null); + accum.append(new TopicPartition(topic, i % numParts), key, value, CompressionType.NONE, null); } catch (Exception e) { e.printStackTrace(); } @@ -130,12 +139,14 @@ public class RecordAccumulatorTest { int read = 0; long now = time.milliseconds(); while (read < numThreads * msgs) { - List tps = accum.ready(now); - List batches = accum.drain(tps, 5 * 1024, 0); - for (RecordBatch batch : batches) { - for (LogEntry entry : batch.records) - read++; - accum.deallocate(batch); + List nodes = accum.ready(cluster, now); + List batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node.id()); + if (batches != null) { + for (RecordBatch batch : batches) { + for (LogEntry entry : batch.records) + read++; + accum.deallocate(batch); + } } } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java index 1df2266..c788e66 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java @@ -20,13 +20,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; +import java.util.*; -import org.apache.kafka.common.utils.AbstractIterator; import org.junit.Test; public class AbstractIteratorTest { @@ -49,7 +44,7 @@ public class AbstractIteratorTest { @Test(expected = NoSuchElementException.class) public void testEmptyIterator() { - Iterator iter = new ListIterator(Arrays.asList()); + Iterator iter = new ListIterator(Collections.emptyList()); iter.next(); }