diff --git kafka-handler/pom.xml kafka-handler/pom.xml
index 647b6a6ed0..02f5a27506 100644
--- kafka-handler/pom.xml
+++ kafka-handler/pom.xml
@@ -30,7 +30,7 @@
..
- 2.2.0
+ 2.3.0
kafka-handler
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
index ba27233f86..9f85681c80 100644
--- kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
@@ -45,7 +45,6 @@
import java.util.Properties;
import java.util.concurrent.Future;
-
/**
* Kafka Producer with public methods to extract the producer state then resuming transaction in another process.
* This Producer is to be used only if you need to extract the transaction state and resume it from a different process.
@@ -133,15 +132,11 @@ synchronized void resumeTransaction(long producerId, short epoch) {
Object transactionManager = getValue(kafkaProducer, "transactionManager");
- Object nextSequence = getValue(transactionManager, "nextSequence");
- Object lastAckedSequence = getValue(transactionManager, "lastAckedSequence");
-
+ Object topicPartitionBookkeeper = getValue(transactionManager, "topicPartitionBookkeeper");
invoke(transactionManager,
"transitionTo",
getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
- invoke(nextSequence, "clear");
- invoke(lastAckedSequence, "clear");
-
+ invoke(topicPartitionBookkeeper, "reset");
Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
setValue(producerIdAndEpoch, "producerId", producerId);
setValue(producerIdAndEpoch, "epoch", epoch);
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
index d87f245776..c3ddbb543d 100644
--- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
@@ -302,7 +302,7 @@ private String getQueryId() {
RetryUtils.retry(buildProducersTask, isRetrayable, cleanUpTheMap, maxTries, "Error while Builing Producers");
} catch (Exception e) {
// Can not go further
- LOG.error("Can not fetch build produces due [{}]", e.getMessage());
+ LOG.error("Can not fetch build produces due [{}]", e);
throw new MetaException(e.getMessage());
}
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java
index fb4d034b2e..e4015fcbba 100644
--- kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java
@@ -46,6 +46,7 @@
import javax.annotation.Nullable;
import java.io.IOException;
+import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -63,6 +64,7 @@
private static final Logger LOG = LoggerFactory.getLogger(TransactionalKafkaWriter.class);
private static final String TRANSACTION_DIR = "transaction_states";
+ private static final Duration DURATION_0 = Duration.ofMillis(0);
private final String topic;
private final HiveKafkaProducer producer;
@@ -178,7 +180,7 @@ private void logHints(Exception e) {
} catch (Exception e) {
LOG.error("Aborting Transaction {} failed due to [{}]", writerIdTopicId, e.getMessage());
}
- producer.close(0, TimeUnit.MILLISECONDS);
+ producer.close(DURATION_0);
return;
}
@@ -209,11 +211,11 @@ private void logHints(Exception e) {
persistTxState();
}
checkExceptions();
- producer.close();
LOG.info("Closed writerId [{}], Sent [{}] records to Topic [{}]",
producer.getTransactionalId(),
sentRecords,
topic);
+ producer.close(Duration.ZERO);
}
private void commitTransaction() {
diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java
index db2515c8ed..8c9ed5f99b 100644
--- kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java
+++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java
@@ -38,6 +38,7 @@
import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
@@ -56,16 +57,17 @@
private static final Logger LOG = LoggerFactory.getLogger(HiveKafkaProducerTest.class);
private static final int RECORD_NUMBER = 17384;
- private static final byte[] KEY_BYTES = "KEY".getBytes(Charset.forName("UTF-8"));
+ private static final byte[] KEY_BYTES = "KEY".getBytes(StandardCharsets.UTF_8);
private static final KafkaBrokerResource KAFKA_BROKER_RESOURCE = new KafkaBrokerResource();
private static final String TOPIC = "test-tx-producer";
private static final List>
RECORDS =
IntStream.range(0, RECORD_NUMBER).mapToObj(number -> {
- final byte[] value = ("VALUE-" + Integer.toString(number)).getBytes(Charset.forName("UTF-8"));
+ final byte[] value = ("VALUE-" + number).getBytes(StandardCharsets.UTF_8);
return new ProducerRecord<>(TOPIC, value, KEY_BYTES);
}).collect(Collectors.toList());
+ private static final short MAX_ATTEMPTS = 500;
@BeforeClass public static void setupCluster() throws Throwable {
KAFKA_BROKER_RESOURCE.before();
@@ -109,7 +111,8 @@
consumer = null;
}
- @Test public void resumeTransaction() {
+
+ @Test(timeout = 120_000) public void resumeTransaction() {
producer.initTransactions();
producer.beginTransaction();
long pid = producer.getProducerId();
@@ -119,7 +122,7 @@
//noinspection unchecked
RECORDS.forEach(producer::send);
producer.flush();
- producer.close();
+ producer.close(Duration.ZERO);
HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
secondProducer.resumeTransaction(pid, epoch);
@@ -132,11 +135,15 @@
consumer.seekToBeginning(assignment);
long numRecords = 0;
@SuppressWarnings("unchecked") final List> actualRecords = new ArrayList();
- while (numRecords < RECORD_NUMBER) {
- ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(1000));
+ short attempts = 0;
+ while (numRecords < RECORD_NUMBER && attempts++ < MAX_ATTEMPTS) {
+ ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(1_000));
actualRecords.addAll(consumerRecords.records(new TopicPartition(TOPIC, 0)));
numRecords += consumerRecords.count();
}
+ if(attempts >= MAX_ATTEMPTS) {
+ Assert.fail("Reached max attempts and total number of records is " + numRecords);
+ }
Assert.assertEquals("Size matters !!", RECORDS.size(), actualRecords.size());
Iterator> expectedIt = RECORDS.iterator();
Iterator> actualIt = actualRecords.iterator();
@@ -152,28 +159,28 @@
HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
secondProducer.resumeTransaction(3434L, (short) 12);
secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
- secondProducer.close();
+ secondProducer.close(Duration.ZERO);
}
@Test(expected = org.apache.kafka.common.KafkaException.class) public void testWrongEpoch() {
producer.initTransactions();
producer.beginTransaction();
long pid = producer.getProducerId();
- producer.close();
+ producer.close(Duration.ZERO);
HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
secondProducer.resumeTransaction(pid, (short) 12);
secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
- secondProducer.close();
+ secondProducer.close(Duration.ZERO);
}
@Test(expected = org.apache.kafka.common.KafkaException.class) public void testWrongPID() {
producer.initTransactions();
producer.beginTransaction();
short epoch = producer.getEpoch();
- producer.close();
+ producer.close(Duration.ZERO);
HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
secondProducer.resumeTransaction(45L, epoch);
secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
- secondProducer.close();
+ secondProducer.close(Duration.ZERO);
}
}
diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
index a79bf4fce9..287fff4cb2 100644
--- kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
+++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
@@ -82,11 +82,6 @@
*/
@Override protected void after() {
super.after();
- try {
- FileUtils.deleteDirectory(new File(tmpLogDir.toString()));
- } catch (IOException e) {
- LOG.error("Error cleaning " + tmpLogDir.toString(), e);
- }
if (kafkaServer != null) {
kafkaServer.shutdown();
kafkaServer.awaitShutdown();
@@ -94,6 +89,11 @@
if (zkServer != null) {
zkServer.shutdown();
}
+ try {
+ FileUtils.deleteDirectory(new File(tmpLogDir.toString()));
+ } catch (IOException e) {
+ LOG.warn("did not clean " + tmpLogDir.toString(), e);
+ }
}
void deleteTopic(@SuppressWarnings("SameParameterValue") String topic) {
diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/TransactionalKafkaWriterTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/TransactionalKafkaWriterTest.java
index 86ed8661c7..7c9ca377ac 100644
--- kafka-handler/src/test/org/apache/hadoop/hive/kafka/TransactionalKafkaWriterTest.java
+++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/TransactionalKafkaWriterTest.java
@@ -145,7 +145,7 @@ public TransactionalKafkaWriterTest() throws IOException {
@After public void tearAfterTest() {
KAFKA_BROKER_RESOURCE.deleteTopic(TOPIC);
- consumer.close();
+ consumer.close(Duration.ZERO);
consumer = null;
}
@@ -229,7 +229,7 @@ private void checkData() {
long numRecords = 0;
boolean emptyPoll = false;
while (numRecords < RECORD_NUMBER && !emptyPoll) {
- ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
Assert.assertFalse(records.records(new TopicPartition(TOPIC, 0))
.stream()