diff --git itests/qtest-druid/pom.xml itests/qtest-druid/pom.xml
index 260e73d2b1..3b9c5c3ef7 100644
--- itests/qtest-druid/pom.xml
+++ itests/qtest-druid/pom.xml
@@ -43,7 +43,7 @@
10.11.1.1
16.0.1
4.1.0
- 2.0.0
+ 2.0.0
@@ -206,18 +206,17 @@
guice
${druid.guice.version}
-
- junit
- junit
- ${junit.version}
- test
+ org.apache.kafka
+ kafka_2.11
+ ${kafka.test.version}
org.apache.kafka
- kafka_2.11
- ${kafka.version}
+ kafka-clients
+ ${kafka.test.version}
+
org.slf4j
slf4j-api
@@ -239,8 +238,6 @@
shade
-
-
false
false
@@ -261,6 +258,12 @@
*:jsp-api*
+
+
+ org.apache.kafka
+ org.apache.kafkatests
+
+
*:*
diff --git itests/qtest/pom.xml itests/qtest/pom.xml
index 801a43d02f..e19a1b296f 100644
--- itests/qtest/pom.xml
+++ itests/qtest/pom.xml
@@ -143,6 +143,16 @@
kafka-handler
${project.version}
test
+
+
+ org.apache.kafka
+ kafka-clients
+
+
+ org.apache.hadoop
+ hadoop-client
+
+
diff --git kafka-handler/pom.xml kafka-handler/pom.xml
index f907e9ddf0..647b6a6ed0 100644
--- kafka-handler/pom.xml
+++ kafka-handler/pom.xml
@@ -30,7 +30,7 @@
..
- 2.0.0
+ 2.2.0
kafka-handler
@@ -94,7 +94,6 @@
test
test
-
org.apache.kafka
kafka_2.11
@@ -108,6 +107,12 @@
${kafka.version}
test
+
+ org.apache.zookeeper
+ zookeeper
+ 3.4.7
+ test
+
org.slf4j
slf4j-api
@@ -142,9 +147,15 @@
false
- org.apache.kafka:*
+ org.apache.kafka:kafka-clients
+
+
+ org.apache.kafka
+ org.apache.kafkaesque
+
+
*:*
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
index 2270e08e2c..ba27233f86 100644
--- kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
@@ -39,11 +39,12 @@
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+
/**
* Kafka Producer with public methods to extract the producer state then resuming transaction in another process.
@@ -107,8 +108,8 @@
kafkaProducer.close();
}
- @Override public void close(long timeout, TimeUnit unit) {
- kafkaProducer.close(timeout, unit);
+ @Override public void close(Duration duration) {
+ kafkaProducer.close(duration);
}
@Override public void flush() {
diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
index fbcbe9a19a..a79bf4fce9 100644
--- kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
+++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
@@ -56,8 +56,8 @@
@Override protected void before() throws Throwable {
// Start the ZK and the Broker
LOG.info("init embedded Zookeeper");
- zkServer = new EmbeddedZookeeper();
tmpLogDir = Files.createTempDirectory("kafka-log-dir-").toAbsolutePath();
+ zkServer = new EmbeddedZookeeper();
String zkConnect = "127.0.0.1:" + zkServer.port();
LOG.info("init kafka broker");
Properties brokerProps = new Properties();
@@ -91,7 +91,9 @@
kafkaServer.shutdown();
kafkaServer.awaitShutdown();
}
- zkServer.shutdown();
+ if (zkServer != null) {
+ zkServer.shutdown();
+ }
}
void deleteTopic(@SuppressWarnings("SameParameterValue") String topic) {