From 5f3e881e8f42fcec7c65acb61eccceffd7d3a3e5 Mon Sep 17 00:00:00 2001 From: Steven Wu Date: Sun, 26 Apr 2015 19:47:53 -0700 Subject: [PATCH 1/2] make MockMetricsReporter a little more generic --- .../java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java | 7 ++++--- .../java/org/apache/kafka/clients/producer/KafkaProducerTest.java | 7 ++++--- .../src/test/java/org/apache/kafka/test/MockMetricsReporter.java | 3 ++- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index eea2c28..738f3ed 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -33,13 +33,14 @@ public class KafkaConsumerTest { props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar:9999"); props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); - MockMetricsReporter.CLOSE_COUNT.set(0); + final int oldInitCount = MockMetricsReporter.INIT_COUNT.get(); + final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get(); try { KafkaConsumer consumer = new KafkaConsumer( props, null, new ByteArrayDeserializer(), new ByteArrayDeserializer()); } catch (KafkaException e) { - Assert.assertEquals(1, MockMetricsReporter.CLOSE_COUNT.get()); - MockMetricsReporter.CLOSE_COUNT.set(0); + Assert.assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get()); + Assert.assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get()); Assert.assertEquals("Failed to construct kafka consumer", e.getMessage()); return; } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 49f1427..a39453a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -34,13 +34,14 @@ public class KafkaProducerTest { props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar:9999"); props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); - MockMetricsReporter.CLOSE_COUNT.set(0); + final int oldInitCount = MockMetricsReporter.INIT_COUNT.get(); + final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get(); try { KafkaProducer producer = new KafkaProducer( props, new ByteArraySerializer(), new ByteArraySerializer()); } catch (KafkaException e) { - Assert.assertEquals(1, MockMetricsReporter.CLOSE_COUNT.get()); - MockMetricsReporter.CLOSE_COUNT.set(0); + Assert.assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get()); + Assert.assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get()); Assert.assertEquals("Failed to construct kafka producer", e.getMessage()); return; } diff --git a/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java b/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java index 6f948f2..2a8fa1f 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java +++ b/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; public class MockMetricsReporter implements MetricsReporter { + public static final AtomicInteger INIT_COUNT = new AtomicInteger(0); public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0); public MockMetricsReporter() { @@ -32,7 +33,7 @@ public class MockMetricsReporter implements MetricsReporter { @Override public void init(List metrics) { - + INIT_COUNT.incrementAndGet(); } @Override -- 2.3.2 (Apple Git-55) From be7c5a89a1b762c729ad914f2c815da838ad3b26 Mon Sep 17 00:00:00 2001 From: Steven Wu Date: Tue, 28 Apr 2015 18:36:12 -0700 Subject: [PATCH 2/2] override java.io.Closeable$close method in Serializer and Deserializer interfaces without throwing checked IOException. this is to avoid breaking the source compatability. --- .../main/java/org/apache/kafka/common/serialization/Deserializer.java | 2 ++ .../src/main/java/org/apache/kafka/common/serialization/Serializer.java | 2 ++ 2 files changed, 4 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java index 9a57579..254b556 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java @@ -39,4 +39,6 @@ public interface Deserializer extends Closeable { */ public T deserialize(String topic, byte[] data); + @Override + public void close(); } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java index c440540..16a67a2 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java @@ -38,4 +38,6 @@ public interface Serializer extends Closeable { */ public byte[] serialize(String topic, T data); + @Override + public void close(); } -- 2.3.2 (Apple Git-55)