diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index 2dad20e..17e2c6e 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -74,6 +74,7 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config))) props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression) var producer = new KafkaProducer(props) + val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "") try { // create topic @@ -93,7 +94,6 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK } // make sure the fetched message count match - val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "") val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) val messageSet = fetchResponse.messageSet(topic, partition).iterator.toBuffer assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet.size) @@ -109,6 +109,8 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK producer.close() producer = null } + if (consumer != null) + consumer.close() } } } diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 24125e2..839fd27 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -92,9 +92,6 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness } override def tearDown() { - server1.shutdown; Utils.rm(server1.config.logDirs) - server2.shutdown; Utils.rm(server2.config.logDirs) - consumer1.close consumer2.close @@ -103,6 +100,9 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness if (producer3 != null) producer3.close if (producer4 != null) producer4.close + server1.shutdown; Utils.rm(server1.config.logDirs) + server2.shutdown; Utils.rm(server2.config.logDirs) + super.tearDown() } diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 1b11eb6..63ebefc 100644 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -253,7 +253,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { private def produceMessage(topic: String, message: String) = { val props = new Properties() props.put("request.required.acks", String.valueOf(-1)) - val producer: Producer[String, Array[Byte]] = createProducer(getBrokerListStrFromConfigs(configs), + val producer: Producer[String, Array[Byte]] = createProducer(getBrokerListStrFromConfigs(configs), new DefaultEncoder(), new StringEncoder(), props) producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, message.getBytes)) producer.close() diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index c1219a8..dc6a5ea 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -76,6 +76,12 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ override def tearDown() { // restore set request handler logger to a higher level requestHandlerLogger.setLevel(Level.ERROR) + + if (consumer1 != null) + consumer1.close() + if (consumer2 != null) + consumer2.close() + server1.shutdown server2.shutdown Utils.rm(server1.config.logDirs)