diff --git kafka-handler/pom.xml kafka-handler/pom.xml index 647b6a6ed0..02f5a27506 100644 --- kafka-handler/pom.xml +++ kafka-handler/pom.xml @@ -30,7 +30,7 @@ .. - 2.2.0 + 2.3.0 kafka-handler diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java index ba27233f86..9f85681c80 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java @@ -45,7 +45,6 @@ import java.util.Properties; import java.util.concurrent.Future; - /** * Kafka Producer with public methods to extract the producer state then resuming transaction in another process. * This Producer is to be used only if you need to extract the transaction state and resume it from a different process. @@ -133,15 +132,11 @@ synchronized void resumeTransaction(long producerId, short epoch) { Object transactionManager = getValue(kafkaProducer, "transactionManager"); - Object nextSequence = getValue(transactionManager, "nextSequence"); - Object lastAckedSequence = getValue(transactionManager, "lastAckedSequence"); - + Object topicPartitionBookkeeper = getValue(transactionManager, "topicPartitionBookkeeper"); invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING")); - invoke(nextSequence, "clear"); - invoke(lastAckedSequence, "clear"); - + invoke(topicPartitionBookkeeper, "reset"); Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch"); setValue(producerIdAndEpoch, "producerId", producerId); setValue(producerIdAndEpoch, "epoch", epoch); diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java index d87f245776..c3ddbb543d 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java @@ -302,7 +302,7 @@ private String getQueryId() { RetryUtils.retry(buildProducersTask, isRetrayable, cleanUpTheMap, maxTries, "Error while Builing Producers"); } catch (Exception e) { // Can not go further - LOG.error("Can not fetch build produces due [{}]", e.getMessage()); + LOG.error("Can not fetch build produces due [{}]", e); throw new MetaException(e.getMessage()); } diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java index fb4d034b2e..e4015fcbba 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java @@ -46,6 +46,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -63,6 +64,7 @@ private static final Logger LOG = LoggerFactory.getLogger(TransactionalKafkaWriter.class); private static final String TRANSACTION_DIR = "transaction_states"; + private static final Duration DURATION_0 = Duration.ofMillis(0); private final String topic; private final HiveKafkaProducer producer; @@ -178,7 +180,7 @@ private void logHints(Exception e) { } catch (Exception e) { LOG.error("Aborting Transaction {} failed due to [{}]", writerIdTopicId, e.getMessage()); } - producer.close(0, TimeUnit.MILLISECONDS); + producer.close(DURATION_0); return; } @@ -209,11 +211,11 @@ private void logHints(Exception e) { persistTxState(); } checkExceptions(); - producer.close(); LOG.info("Closed writerId [{}], Sent [{}] records to Topic [{}]", producer.getTransactionalId(), sentRecords, topic); + producer.close(Duration.ZERO); } private void commitTransaction() { diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java index db2515c8ed..8c9ed5f99b 100644 --- kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; @@ -56,16 +57,17 @@ private static final Logger LOG = LoggerFactory.getLogger(HiveKafkaProducerTest.class); private static final int RECORD_NUMBER = 17384; - private static final byte[] KEY_BYTES = "KEY".getBytes(Charset.forName("UTF-8")); + private static final byte[] KEY_BYTES = "KEY".getBytes(StandardCharsets.UTF_8); private static final KafkaBrokerResource KAFKA_BROKER_RESOURCE = new KafkaBrokerResource(); private static final String TOPIC = "test-tx-producer"; private static final List> RECORDS = IntStream.range(0, RECORD_NUMBER).mapToObj(number -> { - final byte[] value = ("VALUE-" + Integer.toString(number)).getBytes(Charset.forName("UTF-8")); + final byte[] value = ("VALUE-" + number).getBytes(StandardCharsets.UTF_8); return new ProducerRecord<>(TOPIC, value, KEY_BYTES); }).collect(Collectors.toList()); + private static final short MAX_ATTEMPTS = 500; @BeforeClass public static void setupCluster() throws Throwable { KAFKA_BROKER_RESOURCE.before(); @@ -109,7 +111,8 @@ consumer = null; } - @Test public void resumeTransaction() { + + @Test(timeout = 120_000) public void resumeTransaction() { producer.initTransactions(); producer.beginTransaction(); long pid = producer.getProducerId(); @@ -119,7 +122,7 @@ //noinspection unchecked RECORDS.forEach(producer::send); producer.flush(); - producer.close(); + producer.close(Duration.ZERO); HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties); secondProducer.resumeTransaction(pid, epoch); @@ -132,11 +135,15 @@ consumer.seekToBeginning(assignment); long numRecords = 0; @SuppressWarnings("unchecked") final List> actualRecords = new ArrayList(); - while (numRecords < RECORD_NUMBER) { - ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(1000)); + short attempts = 0; + while (numRecords < RECORD_NUMBER && attempts++ < MAX_ATTEMPTS) { + ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(1_000)); actualRecords.addAll(consumerRecords.records(new TopicPartition(TOPIC, 0))); numRecords += consumerRecords.count(); } + if(attempts >= MAX_ATTEMPTS) { + Assert.fail("Reached max attempts and total number of records is " + numRecords); + } Assert.assertEquals("Size matters !!", RECORDS.size(), actualRecords.size()); Iterator> expectedIt = RECORDS.iterator(); Iterator> actualIt = actualRecords.iterator(); @@ -152,28 +159,28 @@ HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties); secondProducer.resumeTransaction(3434L, (short) 12); secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group"); - secondProducer.close(); + secondProducer.close(Duration.ZERO); } @Test(expected = org.apache.kafka.common.KafkaException.class) public void testWrongEpoch() { producer.initTransactions(); producer.beginTransaction(); long pid = producer.getProducerId(); - producer.close(); + producer.close(Duration.ZERO); HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties); secondProducer.resumeTransaction(pid, (short) 12); secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group"); - secondProducer.close(); + secondProducer.close(Duration.ZERO); } @Test(expected = org.apache.kafka.common.KafkaException.class) public void testWrongPID() { producer.initTransactions(); producer.beginTransaction(); short epoch = producer.getEpoch(); - producer.close(); + producer.close(Duration.ZERO); HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties); secondProducer.resumeTransaction(45L, epoch); secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group"); - secondProducer.close(); + secondProducer.close(Duration.ZERO); } } diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java index a79bf4fce9..287fff4cb2 100644 --- kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java @@ -82,11 +82,6 @@ */ @Override protected void after() { super.after(); - try { - FileUtils.deleteDirectory(new File(tmpLogDir.toString())); - } catch (IOException e) { - LOG.error("Error cleaning " + tmpLogDir.toString(), e); - } if (kafkaServer != null) { kafkaServer.shutdown(); kafkaServer.awaitShutdown(); @@ -94,6 +89,11 @@ if (zkServer != null) { zkServer.shutdown(); } + try { + FileUtils.deleteDirectory(new File(tmpLogDir.toString())); + } catch (IOException e) { + LOG.warn("did not clean " + tmpLogDir.toString(), e); + } } void deleteTopic(@SuppressWarnings("SameParameterValue") String topic) { diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/TransactionalKafkaWriterTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/TransactionalKafkaWriterTest.java index 86ed8661c7..7c9ca377ac 100644 --- kafka-handler/src/test/org/apache/hadoop/hive/kafka/TransactionalKafkaWriterTest.java +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/TransactionalKafkaWriterTest.java @@ -145,7 +145,7 @@ public TransactionalKafkaWriterTest() throws IOException { @After public void tearAfterTest() { KAFKA_BROKER_RESOURCE.deleteTopic(TOPIC); - consumer.close(); + consumer.close(Duration.ZERO); consumer = null; } @@ -229,7 +229,7 @@ private void checkData() { long numRecords = 0; boolean emptyPoll = false; while (numRecords < RECORD_NUMBER && !emptyPoll) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); Assert.assertFalse(records.records(new TopicPartition(TOPIC, 0)) .stream()