diff --git itests/qtest-druid/pom.xml itests/qtest-druid/pom.xml index 260e73d2b1..3b9c5c3ef7 100644 --- itests/qtest-druid/pom.xml +++ itests/qtest-druid/pom.xml @@ -43,7 +43,7 @@ 10.11.1.1 16.0.1 4.1.0 - 2.0.0 + 2.0.0 @@ -206,18 +206,17 @@ guice ${druid.guice.version} - - junit - junit - ${junit.version} - test + org.apache.kafka + kafka_2.11 + ${kafka.test.version} org.apache.kafka - kafka_2.11 - ${kafka.version} + kafka-clients + ${kafka.test.version} + org.slf4j slf4j-api @@ -239,8 +238,6 @@ shade - - false false @@ -261,6 +258,12 @@ *:jsp-api* + + + org.apache.kafka + org.apache.kafkatests + + *:* diff --git itests/qtest/pom.xml itests/qtest/pom.xml index 801a43d02f..e19a1b296f 100644 --- itests/qtest/pom.xml +++ itests/qtest/pom.xml @@ -143,6 +143,16 @@ kafka-handler ${project.version} test + + + org.apache.kafka + kafka-clients + + + org.apache.hadoop + hadoop-client + + diff --git kafka-handler/pom.xml kafka-handler/pom.xml index f907e9ddf0..647b6a6ed0 100644 --- kafka-handler/pom.xml +++ kafka-handler/pom.xml @@ -30,7 +30,7 @@ .. - 2.0.0 + 2.2.0 kafka-handler @@ -94,7 +94,6 @@ test test - org.apache.kafka kafka_2.11 @@ -108,6 +107,12 @@ ${kafka.version} test + + org.apache.zookeeper + zookeeper + 3.4.7 + test + org.slf4j slf4j-api @@ -142,9 +147,15 @@ false - org.apache.kafka:* + org.apache.kafka:kafka-clients + + + org.apache.kafka + org.apache.kafkaesque + + *:* diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java index 2270e08e2c..ba27233f86 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java @@ -39,11 +39,12 @@ import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; + /** * Kafka Producer with public methods to extract the producer state then resuming transaction in another process. @@ -107,8 +108,8 @@ kafkaProducer.close(); } - @Override public void close(long timeout, TimeUnit unit) { - kafkaProducer.close(timeout, unit); + @Override public void close(Duration duration) { + kafkaProducer.close(duration); } @Override public void flush() { diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java index fbcbe9a19a..a79bf4fce9 100644 --- kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java @@ -56,8 +56,8 @@ @Override protected void before() throws Throwable { // Start the ZK and the Broker LOG.info("init embedded Zookeeper"); - zkServer = new EmbeddedZookeeper(); tmpLogDir = Files.createTempDirectory("kafka-log-dir-").toAbsolutePath(); + zkServer = new EmbeddedZookeeper(); String zkConnect = "127.0.0.1:" + zkServer.port(); LOG.info("init kafka broker"); Properties brokerProps = new Properties(); @@ -91,7 +91,9 @@ kafkaServer.shutdown(); kafkaServer.awaitShutdown(); } - zkServer.shutdown(); + if (zkServer != null) { + zkServer.shutdown(); + } } void deleteTopic(@SuppressWarnings("SameParameterValue") String topic) {