From f9d01580d4d251b9cc5a5d184aeaff1800458403 Mon Sep 17 00:00:00 2001 From: Alexander Pakulov Date: Mon, 8 Jun 2015 13:40:26 -0700 Subject: [PATCH] KAFKA-2232: Make MockProducer generic --- .../kafka/clients/consumer/MockConsumer.java | 182 --------------- .../kafka/clients/producer/MockProducer.java | 243 -------------------- .../kafka/clients/consumer/MockConsumerTest.java | 1 + .../kafka/clients/producer/MockProducerTest.java | 31 ++- .../java/org/apache/kafka/test/MockConsumer.java | 186 ++++++++++++++++ .../java/org/apache/kafka/test/MockProducer.java | 246 +++++++++++++++++++++ .../java/org/apache/kafka/test/MockSerializer.java | 1 - 7 files changed, 459 insertions(+), 431 deletions(-) delete mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java delete mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java create mode 100644 clients/src/test/java/org/apache/kafka/test/MockConsumer.java create mode 100644 clients/src/test/java/org/apache/kafka/test/MockProducer.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java deleted file mode 100644 index f50da82..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.clients.consumer; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.kafka.clients.consumer.internals.SubscriptionState; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.MetricName; - -/** - * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is not - * threadsafe - *

- * The consumer runs in the user thread and multiplexes I/O over TCP connections to each of the brokers it needs to - * communicate with. Failure to close the consumer after use will leak these resources. - */ -public class MockConsumer implements Consumer { - - private final Map> partitions; - private final SubscriptionState subscriptions; - private Map>> records; - private boolean closed; - - public MockConsumer() { - this.subscriptions = new SubscriptionState(); - this.partitions = new HashMap>(); - this.records = new HashMap>>(); - this.closed = false; - } - - @Override - public synchronized Set subscriptions() { - return this.subscriptions.assignedPartitions(); - } - - @Override - public synchronized void subscribe(String... topics) { - ensureNotClosed(); - for (String topic : topics) - this.subscriptions.subscribe(topic); - } - - @Override - public synchronized void subscribe(TopicPartition... partitions) { - ensureNotClosed(); - for (TopicPartition partition : partitions) - this.subscriptions.subscribe(partition); - } - - public synchronized void unsubscribe(String... topics) { - ensureNotClosed(); - for (String topic : topics) - this.subscriptions.unsubscribe(topic); - } - - public synchronized void unsubscribe(TopicPartition... partitions) { - ensureNotClosed(); - for (TopicPartition partition : partitions) - this.subscriptions.unsubscribe(partition); - } - - @Override - public synchronized ConsumerRecords poll(long timeout) { - ensureNotClosed(); - // update the consumed offset - for (Map.Entry>> entry : this.records.entrySet()) { - List> recs = entry.getValue(); - if (!recs.isEmpty()) - this.subscriptions.consumed(entry.getKey(), recs.get(recs.size() - 1).offset()); - } - - ConsumerRecords copy = new ConsumerRecords(this.records); - this.records = new HashMap>>(); - return copy; - } - - public synchronized void addRecord(ConsumerRecord record) { - ensureNotClosed(); - TopicPartition tp = new TopicPartition(record.topic(), record.partition()); - this.subscriptions.assignedPartitions().add(tp); - List> recs = this.records.get(tp); - if (recs == null) { - recs = new ArrayList>(); - this.records.put(tp, recs); - } - recs.add(record); - } - - @Override - public synchronized void commit(Map offsets, CommitType commitType) { - ensureNotClosed(); - for (Entry entry : offsets.entrySet()) - subscriptions.committed(entry.getKey(), entry.getValue()); - } - - @Override - public synchronized void commit(CommitType commitType) { - ensureNotClosed(); - commit(this.subscriptions.allConsumed(), commitType); - } - - @Override - public synchronized void seek(TopicPartition partition, long offset) { - ensureNotClosed(); - subscriptions.seek(partition, offset); - } - - @Override - public synchronized long committed(TopicPartition partition) { - ensureNotClosed(); - return subscriptions.committed(partition); - } - - @Override - public synchronized long position(TopicPartition partition) { - ensureNotClosed(); - return subscriptions.consumed(partition); - } - - @Override - public synchronized void seekToBeginning(TopicPartition... partitions) { - ensureNotClosed(); - throw new UnsupportedOperationException(); - } - - @Override - public synchronized void seekToEnd(TopicPartition... partitions) { - ensureNotClosed(); - throw new UnsupportedOperationException(); - } - - @Override - public Map metrics() { - ensureNotClosed(); - return Collections.emptyMap(); - } - - @Override - public synchronized List partitionsFor(String topic) { - ensureNotClosed(); - List parts = this.partitions.get(topic); - if (parts == null) - return Collections.emptyList(); - else - return parts; - } - - public synchronized void updatePartitions(String topic, List partitions) { - ensureNotClosed(); - this.partitions.put(topic, partitions); - } - - @Override - public synchronized void close() { - ensureNotClosed(); - this.closed = true; - } - - private void ensureNotClosed() { - if (this.closed) - throw new IllegalStateException("This consumer has already been closed."); - } -} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java deleted file mode 100644 index e66491c..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ /dev/null @@ -1,243 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.producer; - -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Deque; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import org.apache.kafka.clients.producer.internals.FutureRecordMetadata; -import org.apache.kafka.clients.producer.internals.DefaultPartitioner; -import org.apache.kafka.clients.producer.internals.ProduceRequestResult; -import org.apache.kafka.common.*; - - -/** - * A mock of the producer interface you can use for testing code that uses Kafka. - *

- * By default this mock will synchronously complete each send call successfully. However it can be configured to allow - * the user to control the completion of the call and supply an optional error for the producer to throw. - */ -public class MockProducer implements Producer { - - private final Cluster cluster; - private final Partitioner partitioner = new DefaultPartitioner(); - private final List> sent; - private final Deque completions; - private boolean autoComplete; - private Map offsets; - - /** - * Create a mock producer - * - * @param cluster The cluster holding metadata for this producer - * @param autoComplete If true automatically complete all requests successfully and execute the callback. Otherwise - * the user must call {@link #completeNext()} or {@link #errorNext(RuntimeException)} after - * {@link #send(ProducerRecord) send()} to complete the call and unblock the @{link - * java.util.concurrent.Future Future<RecordMetadata>} that is returned. - */ - public MockProducer(Cluster cluster, boolean autoComplete) { - this.cluster = cluster; - this.autoComplete = autoComplete; - this.offsets = new HashMap(); - this.sent = new ArrayList>(); - this.completions = new ArrayDeque(); - } - - /** - * Create a new mock producer with invented metadata the given autoComplete setting. - * - * Equivalent to {@link #MockProducer(Cluster, boolean) new MockProducer(null, autoComplete)} - */ - public MockProducer(boolean autoComplete) { - this(Cluster.empty(), autoComplete); - } - - /** - * Create a new auto completing mock producer - * - * Equivalent to {@link #MockProducer(boolean) new MockProducer(true)} - */ - public MockProducer() { - this(true); - } - - /** - * Adds the record to the list of sent records. The {@link RecordMetadata} returned will be immediately satisfied. - * - * @see #history() - */ - @Override - public synchronized Future send(ProducerRecord record) { - return send(record, null); - } - - /** - * Adds the record to the list of sent records. - * - * @see #history() - */ - @Override - public synchronized Future send(ProducerRecord record, Callback callback) { - int partition = 0; - if (this.cluster.partitionsForTopic(record.topic()) != null) - partition = partition(record, this.cluster); - ProduceRequestResult result = new ProduceRequestResult(); - FutureRecordMetadata future = new FutureRecordMetadata(result, 0); - TopicPartition topicPartition = new TopicPartition(record.topic(), partition); - long offset = nextOffset(topicPartition); - Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, 0, offset), result, callback); - this.sent.add(record); - if (autoComplete) - completion.complete(null); - else - this.completions.addLast(completion); - return future; - } - - /** - * Get the next offset for this topic/partition - */ - private long nextOffset(TopicPartition tp) { - Long offset = this.offsets.get(tp); - if (offset == null) { - this.offsets.put(tp, 1L); - return 0L; - } else { - Long next = offset + 1; - this.offsets.put(tp, next); - return offset; - } - } - - public synchronized void flush() { - while (!this.completions.isEmpty()) - completeNext(); - } - - public List partitionsFor(String topic) { - return this.cluster.partitionsForTopic(topic); - } - - public Map metrics() { - return Collections.emptyMap(); - } - - @Override - public void close() { - } - - @Override - public void close(long timeout, TimeUnit timeUnit) { - } - - /** - * Get the list of sent records since the last call to {@link #clear()} - */ - public synchronized List> history() { - return new ArrayList>(this.sent); - } - - /** - * Clear the stored history of sent records - */ - public synchronized void clear() { - this.sent.clear(); - this.completions.clear(); - } - - /** - * Complete the earliest uncompleted call successfully. - * - * @return true if there was an uncompleted call to complete - */ - public synchronized boolean completeNext() { - return errorNext(null); - } - - /** - * Complete the earliest uncompleted call with the given error. - * - * @return true if there was an uncompleted call to complete - */ - public synchronized boolean errorNext(RuntimeException e) { - Completion completion = this.completions.pollFirst(); - if (completion != null) { - completion.complete(e); - return true; - } else { - return false; - } - } - - /** - * computes partition for given record. - */ - private int partition(ProducerRecord record, Cluster cluster) { - Integer partition = record.partition(); - if (partition != null) { - List partitions = cluster.partitionsForTopic(record.topic()); - int numPartitions = partitions.size(); - // they have given us a partition, use it - if (partition < 0 || partition >= numPartitions) - throw new IllegalArgumentException("Invalid partition given with record: " + partition - + " is not in the range [0..." - + numPartitions - + "]."); - return partition; - } - return this.partitioner.partition(record.topic(), null, record.key(), null, record.value(), cluster); - } - - - private static class Completion { - private final long offset; - private final RecordMetadata metadata; - private final ProduceRequestResult result; - private final Callback callback; - private final TopicPartition topicPartition; - - public Completion(TopicPartition topicPartition, - long offset, - RecordMetadata metadata, - ProduceRequestResult result, - Callback callback) { - this.metadata = metadata; - this.offset = offset; - this.result = result; - this.callback = callback; - this.topicPartition = topicPartition; - } - - public void complete(RuntimeException e) { - result.done(topicPartition, e == null ? offset : -1L, e); - if (callback != null) { - if (e == null) - callback.onCompletion(metadata, null); - else - callback.onCompletion(null, e); - } - } - } - -} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java index 677edd3..a12b81c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.*; import java.util.Iterator; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.test.MockConsumer; import org.junit.Test; public class MockConsumerTest { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java index 6372f1a..9c88509 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java @@ -17,14 +17,22 @@ package org.apache.kafka.clients.producer; import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.util.ArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.test.MockProducer; +import org.apache.kafka.test.MockSerializer; import org.junit.Test; public class MockProducerTest { @@ -34,23 +42,36 @@ public class MockProducerTest { @Test @SuppressWarnings("unchecked") public void testAutoCompleteMock() throws Exception { - MockProducer producer = new MockProducer(true); + MockProducer producer = new MockProducer(true, new MockSerializer(), new MockSerializer()); ProducerRecord record = new ProducerRecord(topic, "key".getBytes(), "value".getBytes()); Future metadata = producer.send(record); assertTrue("Send should be immediately complete", metadata.isDone()); assertFalse("Send should be successful", isError(metadata)); assertEquals("Offset should be 0", 0L, metadata.get().offset()); assertEquals(topic, metadata.get().topic()); - assertEquals("We should have the record in our history", asList(record), producer.history()); + assertEquals("We should have the record in our history", singletonList(record), producer.history()); + producer.clear(); + assertEquals("Clear should erase our history", 0, producer.history().size()); + } + + @Test + public void testPartitioner() throws Exception { + PartitionInfo partitionInfo0 = new PartitionInfo(topic, 0, null, null, null); + PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null); + Cluster cluster = new Cluster(new ArrayList(0), asList(partitionInfo0, partitionInfo1)); + MockProducer producer = new MockProducer(cluster, true, new StringSerializer(), new StringSerializer()); + ProducerRecord record = new ProducerRecord(topic, "key", "value"); + Future metadata = producer.send(record); + assertEquals("Partition should be correct", 1, metadata.get().partition()); producer.clear(); assertEquals("Clear should erase our history", 0, producer.history().size()); } @Test public void testManualCompletion() throws Exception { - MockProducer producer = new MockProducer(false); - ProducerRecord record1 = new ProducerRecord("topic", "key1".getBytes(), "value1".getBytes()); - ProducerRecord record2 = new ProducerRecord("topic", "key2".getBytes(), "value2".getBytes()); + MockProducer producer = new MockProducer(false, new MockSerializer(), new MockSerializer()); + ProducerRecord record1 = new ProducerRecord(topic, "key1".getBytes(), "value1".getBytes()); + ProducerRecord record2 = new ProducerRecord(topic, "key2".getBytes(), "value2".getBytes()); Future md1 = producer.send(record1); assertFalse("Send shouldn't have completed", md1.isDone()); Future md2 = producer.send(record2); diff --git a/clients/src/test/java/org/apache/kafka/test/MockConsumer.java b/clients/src/test/java/org/apache/kafka/test/MockConsumer.java new file mode 100644 index 0000000..b845a48 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/test/MockConsumer.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.kafka.clients.consumer.CommitType; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.internals.SubscriptionState; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.MetricName; + +/** + * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is not + * threadsafe + *

+ * The consumer runs in the user thread and multiplexes I/O over TCP connections to each of the brokers it needs to + * communicate with. Failure to close the consumer after use will leak these resources. + */ +public class MockConsumer implements Consumer { + + private final Map> partitions; + private final SubscriptionState subscriptions; + private Map>> records; + private boolean closed; + + public MockConsumer() { + this.subscriptions = new SubscriptionState(); + this.partitions = new HashMap>(); + this.records = new HashMap>>(); + this.closed = false; + } + + @Override + public synchronized Set subscriptions() { + return this.subscriptions.assignedPartitions(); + } + + @Override + public synchronized void subscribe(String... topics) { + ensureNotClosed(); + for (String topic : topics) + this.subscriptions.subscribe(topic); + } + + @Override + public synchronized void subscribe(TopicPartition... partitions) { + ensureNotClosed(); + for (TopicPartition partition : partitions) + this.subscriptions.subscribe(partition); + } + + public synchronized void unsubscribe(String... topics) { + ensureNotClosed(); + for (String topic : topics) + this.subscriptions.unsubscribe(topic); + } + + public synchronized void unsubscribe(TopicPartition... partitions) { + ensureNotClosed(); + for (TopicPartition partition : partitions) + this.subscriptions.unsubscribe(partition); + } + + @Override + public synchronized ConsumerRecords poll(long timeout) { + ensureNotClosed(); + // update the consumed offset + for (Map.Entry>> entry : this.records.entrySet()) { + List> recs = entry.getValue(); + if (!recs.isEmpty()) + this.subscriptions.consumed(entry.getKey(), recs.get(recs.size() - 1).offset()); + } + + ConsumerRecords copy = new ConsumerRecords(this.records); + this.records = new HashMap>>(); + return copy; + } + + public synchronized void addRecord(ConsumerRecord record) { + ensureNotClosed(); + TopicPartition tp = new TopicPartition(record.topic(), record.partition()); + this.subscriptions.assignedPartitions().add(tp); + List> recs = this.records.get(tp); + if (recs == null) { + recs = new ArrayList>(); + this.records.put(tp, recs); + } + recs.add(record); + } + + @Override + public synchronized void commit(Map offsets, CommitType commitType) { + ensureNotClosed(); + for (Entry entry : offsets.entrySet()) + subscriptions.committed(entry.getKey(), entry.getValue()); + } + + @Override + public synchronized void commit(CommitType commitType) { + ensureNotClosed(); + commit(this.subscriptions.allConsumed(), commitType); + } + + @Override + public synchronized void seek(TopicPartition partition, long offset) { + ensureNotClosed(); + subscriptions.seek(partition, offset); + } + + @Override + public synchronized long committed(TopicPartition partition) { + ensureNotClosed(); + return subscriptions.committed(partition); + } + + @Override + public synchronized long position(TopicPartition partition) { + ensureNotClosed(); + return subscriptions.consumed(partition); + } + + @Override + public synchronized void seekToBeginning(TopicPartition... partitions) { + ensureNotClosed(); + throw new UnsupportedOperationException(); + } + + @Override + public synchronized void seekToEnd(TopicPartition... partitions) { + ensureNotClosed(); + throw new UnsupportedOperationException(); + } + + @Override + public Map metrics() { + ensureNotClosed(); + return Collections.emptyMap(); + } + + @Override + public synchronized List partitionsFor(String topic) { + ensureNotClosed(); + List parts = this.partitions.get(topic); + if (parts == null) + return Collections.emptyList(); + else + return parts; + } + + public synchronized void updatePartitions(String topic, List partitions) { + ensureNotClosed(); + this.partitions.put(topic, partitions); + } + + @Override + public synchronized void close() { + ensureNotClosed(); + this.closed = true; + } + + private void ensureNotClosed() { + if (this.closed) + throw new IllegalStateException("This consumer has already been closed."); + } +} diff --git a/clients/src/test/java/org/apache/kafka/test/MockProducer.java b/clients/src/test/java/org/apache/kafka/test/MockProducer.java new file mode 100644 index 0000000..6d3a8f1 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/test/MockProducer.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.producer.internals.FutureRecordMetadata; +import org.apache.kafka.clients.producer.internals.DefaultPartitioner; +import org.apache.kafka.clients.producer.internals.ProduceRequestResult; +import org.apache.kafka.common.*; +import org.apache.kafka.common.serialization.Serializer; + + +/** + * A mock of the producer interface you can use for testing code that uses Kafka. + *

+ * By default this mock will synchronously complete each send call successfully. However it can be configured to allow + * the user to control the completion of the call and supply an optional error for the producer to throw. + */ +public class MockProducer implements Producer { + + private final Cluster cluster; + private final Partitioner partitioner = new DefaultPartitioner(); + private final List> sent; + private final Deque completions; + private boolean autoComplete; + private Map offsets; + private final Serializer keySerializer; + private final Serializer valueSerializer; + + /** + * Create a mock producer + * + * @param cluster The cluster holding metadata for this producer + * @param autoComplete If true automatically complete all requests successfully and execute the callback. Otherwise + * the user must call {@link #completeNext()} or {@link #errorNext(RuntimeException)} after + * {@link #send(ProducerRecord) send()} to complete the call and unblock the @{link + * java.util.concurrent.Future Future<RecordMetadata>} that is returned. + */ + public MockProducer(Cluster cluster, boolean autoComplete, Serializer keySerializer, Serializer valueSerializer) { + this.cluster = cluster; + this.autoComplete = autoComplete; + this.keySerializer = keySerializer; + this.valueSerializer = valueSerializer; + this.offsets = new HashMap(); + this.sent = new ArrayList>(); + this.completions = new ArrayDeque(); + } + + /** + * Create a new mock producer with invented metadata the given autoComplete setting. + * + * Equivalent to {@link #MockProducer(Cluster, boolean, Serializer, Serializer)} new MockProducer(null, autoComplete)} + */ + public MockProducer(boolean autoComplete, Serializer keySerializer, Serializer valueSerializer) { + this(Cluster.empty(), autoComplete, keySerializer, valueSerializer); + } + + /** + * Adds the record to the list of sent records. The {@link RecordMetadata} returned will be immediately satisfied. + * + * @see #history() + */ + @Override + public synchronized Future send(ProducerRecord record) { + return send(record, null); + } + + /** + * Adds the record to the list of sent records. + * + * @see #history() + */ + @Override + public synchronized Future send(ProducerRecord record, Callback callback) { + int partition = 0; + if (this.cluster.partitionsForTopic(record.topic()) != null) + partition = partition(record, this.cluster); + ProduceRequestResult result = new ProduceRequestResult(); + FutureRecordMetadata future = new FutureRecordMetadata(result, 0); + TopicPartition topicPartition = new TopicPartition(record.topic(), partition); + long offset = nextOffset(topicPartition); + Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, 0, offset), result, callback); + this.sent.add(record); + if (autoComplete) + completion.complete(null); + else + this.completions.addLast(completion); + return future; + } + + /** + * Get the next offset for this topic/partition + */ + private long nextOffset(TopicPartition tp) { + Long offset = this.offsets.get(tp); + if (offset == null) { + this.offsets.put(tp, 1L); + return 0L; + } else { + Long next = offset + 1; + this.offsets.put(tp, next); + return offset; + } + } + + public synchronized void flush() { + while (!this.completions.isEmpty()) + completeNext(); + } + + public List partitionsFor(String topic) { + return this.cluster.partitionsForTopic(topic); + } + + public Map metrics() { + return Collections.emptyMap(); + } + + @Override + public void close() { + } + + @Override + public void close(long timeout, TimeUnit timeUnit) { + } + + /** + * Get the list of sent records since the last call to {@link #clear()} + */ + public synchronized List> history() { + return new ArrayList>(this.sent); + } + + /** + * Clear the stored history of sent records + */ + public synchronized void clear() { + this.sent.clear(); + this.completions.clear(); + } + + /** + * Complete the earliest uncompleted call successfully. + * + * @return true if there was an uncompleted call to complete + */ + public synchronized boolean completeNext() { + return errorNext(null); + } + + /** + * Complete the earliest uncompleted call with the given error. + * + * @return true if there was an uncompleted call to complete + */ + public synchronized boolean errorNext(RuntimeException e) { + Completion completion = this.completions.pollFirst(); + if (completion != null) { + completion.complete(e); + return true; + } else { + return false; + } + } + + /** + * computes partition for given record. + */ + private int partition(ProducerRecord record, Cluster cluster) { + Integer partition = record.partition(); + String topic = record.topic(); + if (partition != null) { + List partitions = cluster.partitionsForTopic(topic); + int numPartitions = partitions.size(); + // they have given us a partition, use it + if (partition < 0 || partition >= numPartitions) + throw new IllegalArgumentException("Invalid partition given with record: " + partition + + " is not in the range [0..." + + numPartitions + + "]."); + return partition; + } + byte[] key = keySerializer.serialize(topic, record.key()); + byte[] value = valueSerializer.serialize(topic, record.value()); + return this.partitioner.partition(topic, null, key, null, value, cluster); + } + + private static class Completion { + private final long offset; + private final RecordMetadata metadata; + private final ProduceRequestResult result; + private final Callback callback; + private final TopicPartition topicPartition; + + public Completion(TopicPartition topicPartition, + long offset, + RecordMetadata metadata, + ProduceRequestResult result, + Callback callback) { + this.metadata = metadata; + this.offset = offset; + this.result = result; + this.callback = callback; + this.topicPartition = topicPartition; + } + + public void complete(RuntimeException e) { + result.done(topicPartition, e == null ? offset : -1L, e); + if (callback != null) { + if (e == null) + callback.onCompletion(metadata, null); + else + callback.onCompletion(null, e); + } + } + } + +} diff --git a/clients/src/test/java/org/apache/kafka/test/MockSerializer.java b/clients/src/test/java/org/apache/kafka/test/MockSerializer.java index e75d2e4..0348258 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockSerializer.java +++ b/clients/src/test/java/org/apache/kafka/test/MockSerializer.java @@ -31,7 +31,6 @@ public class MockSerializer implements Serializer { @Override public void configure(Map configs, boolean isKey) { - } @Override -- 2.4.3