diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java index b0db0ad055..3a1ef0d166 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java @@ -36,6 +36,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.time.Duration; import java.util.Properties; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -120,7 +121,7 @@ private void handleKafkaException(KafkaException kafkaException) { @Override public void close(boolean abort) throws IOException { if (abort) { LOG.info("Aborting is set to TRUE, Closing writerId [{}] without flush.", writerId); - producer.close(0, TimeUnit.MICROSECONDS); + producer.close(Duration.ofMillis(0L)); return; } else { LOG.info("Flushing Kafka Producer with writerId [{}]", writerId); @@ -159,7 +160,7 @@ private void handleKafkaException(KafkaException kafkaException) { private void checkExceptions() throws IOException { if (sendExceptionRef.get() != null) { LOG.error("Send Exception Aborting write from writerId [{}]", writerId); - producer.close(0, TimeUnit.MICROSECONDS); + producer.close(Duration.ofMillis(0L)); throw new IOException(sendExceptionRef.get()); } } diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java index fb4d034b2e..73a18b86c2 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java @@ -46,6 +46,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -124,7 +125,7 @@ producer.abortTransaction(); } LOG.error("Closing writer [{}] caused by ERROR [{}]", producer.getTransactionalId(), exception.getMessage()); - producer.close(0, TimeUnit.MILLISECONDS); + producer.close(Duration.ofMillis(0L)); throw exception; } writerIdTopicId = String.format("WriterId [%s], Kafka Topic [%s]", producer.getTransactionalId(), topic); @@ -149,7 +150,7 @@ // producer.send() may throw a KafkaException which wraps a FencedException re throw its wrapped inner cause. producer.abortTransaction(); } - producer.close(0, TimeUnit.MILLISECONDS); + producer.close(Duration.ofMillis(0L)); sendExceptionRef.compareAndSet(null, e); checkExceptions(); } @@ -178,7 +179,7 @@ private void logHints(Exception e) { } catch (Exception e) { LOG.error("Aborting Transaction {} failed due to [{}]", writerIdTopicId, e.getMessage()); } - producer.close(0, TimeUnit.MILLISECONDS); + producer.close(Duration.ofMillis(0L)); return; } @@ -276,7 +277,7 @@ private void checkExceptions() throws IOException { producer.abortTransaction(); } LOG.error("Closing writer [{}] caused by ERROR [{}]", writerIdTopicId, exception.getMessage()); - producer.close(0, TimeUnit.MILLISECONDS); + producer.close(Duration.ofMillis(0L)); throw new IOException(exception); } }