From 5f3e881e8f42fcec7c65acb61eccceffd7d3a3e5 Mon Sep 17 00:00:00 2001 From: Steven Wu Date: Sun, 26 Apr 2015 19:47:53 -0700 Subject: [PATCH 1/5] make MockMetricsReporter a little more generic --- .../java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java | 7 ++++--- .../java/org/apache/kafka/clients/producer/KafkaProducerTest.java | 7 ++++--- .../src/test/java/org/apache/kafka/test/MockMetricsReporter.java | 3 ++- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index eea2c28..738f3ed 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -33,13 +33,14 @@ public class KafkaConsumerTest { props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar:9999"); props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); - MockMetricsReporter.CLOSE_COUNT.set(0); + final int oldInitCount = MockMetricsReporter.INIT_COUNT.get(); + final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get(); try { KafkaConsumer consumer = new KafkaConsumer( props, null, new ByteArrayDeserializer(), new ByteArrayDeserializer()); } catch (KafkaException e) { - Assert.assertEquals(1, MockMetricsReporter.CLOSE_COUNT.get()); - MockMetricsReporter.CLOSE_COUNT.set(0); + Assert.assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get()); + Assert.assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get()); Assert.assertEquals("Failed to construct kafka consumer", e.getMessage()); return; } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 49f1427..a39453a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -34,13 +34,14 @@ public class KafkaProducerTest { props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar:9999"); props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); - MockMetricsReporter.CLOSE_COUNT.set(0); + final int oldInitCount = MockMetricsReporter.INIT_COUNT.get(); + final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get(); try { KafkaProducer producer = new KafkaProducer( props, new ByteArraySerializer(), new ByteArraySerializer()); } catch (KafkaException e) { - Assert.assertEquals(1, MockMetricsReporter.CLOSE_COUNT.get()); - MockMetricsReporter.CLOSE_COUNT.set(0); + Assert.assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get()); + Assert.assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get()); Assert.assertEquals("Failed to construct kafka producer", e.getMessage()); return; } diff --git a/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java b/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java index 6f948f2..2a8fa1f 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java +++ b/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; public class MockMetricsReporter implements MetricsReporter { + public static final AtomicInteger INIT_COUNT = new AtomicInteger(0); public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0); public MockMetricsReporter() { @@ -32,7 +33,7 @@ public class MockMetricsReporter implements MetricsReporter { @Override public void init(List metrics) { - + INIT_COUNT.incrementAndGet(); } @Override -- 2.3.2 (Apple Git-55) From be7c5a89a1b762c729ad914f2c815da838ad3b26 Mon Sep 17 00:00:00 2001 From: Steven Wu Date: Tue, 28 Apr 2015 18:36:12 -0700 Subject: [PATCH 2/5] override java.io.Closeable$close method in Serializer and Deserializer interfaces without throwing checked IOException. this is to avoid breaking the source compatability. --- .../main/java/org/apache/kafka/common/serialization/Deserializer.java | 2 ++ .../src/main/java/org/apache/kafka/common/serialization/Serializer.java | 2 ++ 2 files changed, 4 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java index 9a57579..254b556 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java @@ -39,4 +39,6 @@ public interface Deserializer extends Closeable { */ public T deserialize(String topic, byte[] data); + @Override + public void close(); } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java index c440540..16a67a2 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java @@ -38,4 +38,6 @@ public interface Serializer extends Closeable { */ public byte[] serialize(String topic, T data); + @Override + public void close(); } -- 2.3.2 (Apple Git-55) From db223e68a045710d28d6b31fc3f2a82a935e15d6 Mon Sep 17 00:00:00 2001 From: Steven Wu Date: Fri, 1 May 2015 10:59:28 -0700 Subject: [PATCH 3/5] add a test for checking Serializer is closed during KafkaProducer#close --- .../kafka/clients/producer/KafkaProducerTest.java | 23 ++++++++++++++++- .../java/org/apache/kafka/test/MockSerializer.java | 30 ++++++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 clients/src/test/java/org/apache/kafka/test/MockSerializer.java diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index a39453a..85fee14 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.clients.producer; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.test.MockMetricsReporter; +import org.apache.kafka.test.MockSerializer; import org.junit.Assert; import org.junit.Test; @@ -28,7 +29,7 @@ public class KafkaProducerTest { @Test - public void testConstructorClose() throws Exception { + public void testConstructorFailureCloseResource() throws Exception { Properties props = new Properties(); props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose"); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar:9999"); @@ -47,4 +48,24 @@ public class KafkaProducerTest { } Assert.fail("should have caught an exception and returned"); } + + @Test + public void testSerializerClose() throws Exception { + Properties props = new Properties(); + props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose"); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); + + final int oldInitCount = MockSerializer.INIT_COUNT.get(); + final int oldCloseCount = MockSerializer.CLOSE_COUNT.get(); + + KafkaProducer producer = new KafkaProducer( + props, new MockSerializer(), new MockSerializer()); + Assert.assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get()); + Assert.assertEquals(oldCloseCount, MockSerializer.CLOSE_COUNT.get()); + + producer.close(); + Assert.assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get()); + Assert.assertEquals(oldCloseCount + 2, MockSerializer.CLOSE_COUNT.get()); + } } diff --git a/clients/src/test/java/org/apache/kafka/test/MockSerializer.java b/clients/src/test/java/org/apache/kafka/test/MockSerializer.java new file mode 100644 index 0000000..3634084 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/test/MockSerializer.java @@ -0,0 +1,30 @@ +package org.apache.kafka.test; + +import org.apache.kafka.common.serialization.Serializer; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +public class MockSerializer implements Serializer { + public static final AtomicInteger INIT_COUNT = new AtomicInteger(0); + public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0); + + public MockSerializer() { + INIT_COUNT.incrementAndGet(); + } + + @Override + public void configure(Map configs, boolean isKey) { + + } + + @Override + public byte[] serialize(String topic, byte[] data) { + return data; + } + + @Override + public void close() { + CLOSE_COUNT.incrementAndGet(); + } +} -- 2.3.2 (Apple Git-55) From 2ebd8786f5420274a2f4cdd6b8c1113514431b82 Mon Sep 17 00:00:00 2001 From: Steven Wu Date: Fri, 1 May 2015 15:38:05 -0700 Subject: [PATCH 4/5] missing copyright header in previous checkin --- .../test/java/org/apache/kafka/test/MockSerializer.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/test/MockSerializer.java b/clients/src/test/java/org/apache/kafka/test/MockSerializer.java index 3634084..e75d2e4 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockSerializer.java +++ b/clients/src/test/java/org/apache/kafka/test/MockSerializer.java @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.kafka.test; import org.apache.kafka.common.serialization.Serializer; -- 2.3.2 (Apple Git-55) From a9f4c5890713a8d9368b59f68b35443a95a56be9 Mon Sep 17 00:00:00 2001 From: Steven Wu Date: Fri, 1 May 2015 15:41:27 -0700 Subject: [PATCH 5/5] remvoed "throws Exception" for test methods --- .../java/org/apache/kafka/clients/producer/KafkaProducerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 85fee14..f3f8334 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -29,7 +29,7 @@ public class KafkaProducerTest { @Test - public void testConstructorFailureCloseResource() throws Exception { + public void testConstructorFailureCloseResource() { Properties props = new Properties(); props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose"); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar:9999"); @@ -50,7 +50,7 @@ public class KafkaProducerTest { } @Test - public void testSerializerClose() throws Exception { + public void testSerializerClose() { Properties props = new Properties(); props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose"); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); -- 2.3.2 (Apple Git-55)