diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java index 4f0ee94655..06a10b4b8b 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java @@ -89,7 +89,7 @@ private synchronized void initialize(KafkaPullerInputSplit inputSplit, Configura LOG.debug("Consumer poll timeout [{}] ms", pollTimeout); this.recordsCursor = startOffset == endOffset ? - new KafkaRecordIterator.EmptyIterator() : + new EmptyIterator() : new KafkaRecordIterator(consumer, topicPartition, startOffset, endOffset, pollTimeout); started = true; } @@ -157,4 +157,17 @@ private synchronized void initialize(KafkaPullerInputSplit inputSplit, Configura consumer.close(); } } + + /** + * Empty iterator for empty splits when startOffset == endOffset, this is added to avoid clumsy if condition. + */ + private static final class EmptyIterator implements Iterator> { + @Override public boolean hasNext() { + return false; + } + + @Override public ConsumerRecord next() { + throw new IllegalStateException("this is an empty iterator"); + } + } } diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java index 7daa3e2544..c252455348 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java @@ -18,14 +18,13 @@ package org.apache.hadoop.hive.kafka; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; -import com.google.common.collect.ImmutableList; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,108 +35,120 @@ import java.util.concurrent.TimeUnit; /** - * Iterator over Kafka Records to read records from a single topic partition inclusive start exclusive end. - *

- * If {@code startOffset} is not null will seek up to that offset - * Else If {@code startOffset} is null will seek to beginning see - * {@link org.apache.kafka.clients.consumer.Consumer#seekToBeginning(java.util.Collection)} - *

- * When provided with an end offset it will return records up to the record with offset == endOffset - 1, - * Else If end offsets is null it will read up to the current end see - * {@link org.apache.kafka.clients.consumer.Consumer#endOffsets(java.util.Collection)} - *

- * Current implementation of this Iterator will throw and exception if can not poll up to the endOffset - 1 + * Iterator over Kafka Records to read records from a single topic partition inclusive start, exclusive end. */ public class KafkaRecordIterator implements Iterator> { private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordIterator.class); + private static final String + POLL_TIMEOUT_HINT = + String.format("Try increasing poll timeout using Hive Table property [%s]", + KafkaStreamingUtils.HIVE_KAFKA_POLL_TIMEOUT); + private static final String + ERROR_POLL_TIMEOUT_FORMAT = + "Consumer returned [0] record due to exhausted poll timeout [%s]ms from TopicPartition:[%s] " + + "start Offset [%s], current consumer position [%s], target end offset [%s], " + + POLL_TIMEOUT_HINT; private final Consumer consumer; private final TopicPartition topicPartition; - private long endOffset; - private long startOffset; + private final long endOffset; + private final long startOffset; private final long pollTimeoutMs; private final Stopwatch stopwatch = Stopwatch.createUnstarted(); private ConsumerRecords records; - private long currentOffset; + private long consumerPosition; private ConsumerRecord nextRecord; private boolean hasMore = true; - private final boolean started; - - //Kafka consumer poll method return an iterator of records. private Iterator> consumerRecordIterator = null; /** - * @param consumer functional kafka consumer - * @param topicPartition kafka topic partition - * @param startOffset start position of stream. - * @param endOffset requested end position. If null will read up to current last - * @param pollTimeoutMs poll time out in ms + * Iterator over Kafka Records over a single {@code topicPartition} inclusive {@code startOffset}, + * up to exclusive {@code endOffset}. + *

+ * If {@code requestedStartOffset} is not null will seek up to that offset + * Else If {@code requestedStartOffset} is null will seek to beginning see + * {@link org.apache.kafka.clients.consumer.Consumer#seekToBeginning(java.util.Collection)} + *

+ * When provided with {@code requestedEndOffset}, will return records up to consumer position == endOffset + * Else If {@code requestedEndOffset} is null it will read up to the current end of the stream + * {@link org.apache.kafka.clients.consumer.Consumer#seekToEnd(java.util.Collection)} + *

+ * @param consumer functional kafka consumer. + * @param topicPartition kafka topic partition. + * @param requestedStartOffset requested start position. + * @param requestedEndOffset requested end position. If null will read up to current last + * @param pollTimeoutMs poll time out in ms. */ KafkaRecordIterator(Consumer consumer, TopicPartition topicPartition, - @Nullable Long startOffset, - @Nullable Long endOffset, + @Nullable Long requestedStartOffset, + @Nullable Long requestedEndOffset, long pollTimeoutMs) { this.consumer = Preconditions.checkNotNull(consumer, "Consumer can not be null"); this.topicPartition = Preconditions.checkNotNull(topicPartition, "Topic partition can not be null"); this.pollTimeoutMs = pollTimeoutMs; - Preconditions.checkState(this.pollTimeoutMs > 0, "poll timeout has to be positive number"); - this.startOffset = startOffset == null ? -1L : startOffset; - this.endOffset = endOffset == null ? -1L : endOffset; - assignAndSeek(); - this.started = true; - } - - KafkaRecordIterator(Consumer consumer, TopicPartition tp, long pollTimeoutMs) { - this(consumer, tp, null, null, pollTimeoutMs); - } - - private void assignAndSeek() { + Preconditions.checkState(this.pollTimeoutMs > 0, "Poll timeout has to be positive number"); + final List topicPartitionList = Collections.singletonList(topicPartition); // assign topic partition to consumer - final List topicPartitionList = ImmutableList.of(topicPartition); - if (LOG.isTraceEnabled()) { - stopwatch.reset().start(); + consumer.assign(topicPartitionList); + + // do to End Offset first in case of we have to seek to end to figure out the last available offset + if (requestedEndOffset == null) { + consumer.seekToEnd(topicPartitionList); + this.endOffset = consumer.position(topicPartition); + LOG.info("End Offset set to [{}]", this.endOffset); + } else { + this.endOffset = requestedEndOffset; } - consumer.assign(topicPartitionList); - // compute offsets and seek to start - if (startOffset > -1) { - LOG.info("Seeking to offset [{}] of topic partition [{}]", startOffset, topicPartition); - consumer.seek(topicPartition, startOffset); + // seek to start offsets + if (requestedStartOffset != null) { + LOG.info("Seeking to offset [{}] of topic partition [{}]", requestedStartOffset, topicPartition); + consumer.seek(topicPartition, requestedStartOffset); + this.startOffset = consumer.position(topicPartition); + if (this.startOffset != requestedStartOffset) { + LOG.warn("Current Start Offset [{}] is different form the requested start position [{}]", + this.startOffset, + requestedStartOffset); + } } else { - LOG.info("Seeking to beginning of topic partition [{}]", topicPartition); + // case seek to beginning of stream + consumer.seekToBeginning(Collections.singleton(topicPartition)); // seekToBeginning is lazy thus need to call position() or poll(0) - this.consumer.seekToBeginning(Collections.singleton(topicPartition)); - startOffset = consumer.position(topicPartition); - } - if (endOffset == -1) { - this.endOffset = consumer.endOffsets(topicPartitionList).get(topicPartition); - LOG.info("EndOffset set to {}", endOffset); + this.startOffset = consumer.position(topicPartition); + LOG.info("Consumer at beginning of topic partition [{}], current start offset [{}]", + topicPartition, + this.startOffset); } - currentOffset = consumer.position(topicPartition); - Preconditions.checkState(this.endOffset >= currentOffset, - "End offset [%s] need to be greater than start offset [%s]", + + consumerPosition = consumer.position(topicPartition); + Preconditions.checkState(this.endOffset >= consumerPosition, + "End offset [%s] need to be greater or equal than start offset [%s]", this.endOffset, - currentOffset); - LOG.info("Kafka Iterator ready, assigned TopicPartition [{}]; startOffset [{}]; endOffset [{}]", + consumerPosition); + LOG.info("Kafka Iterator assigned to TopicPartition [{}]; start Offset [{}]; end Offset [{}]", topicPartition, - currentOffset, + consumerPosition, this.endOffset); - if (LOG.isTraceEnabled()) { - stopwatch.stop(); - LOG.trace("Time to assign and seek [{}] ms", stopwatch.elapsed(TimeUnit.MILLISECONDS)); - } + } - @Override - public boolean hasNext() { + @VisibleForTesting KafkaRecordIterator(Consumer consumer, TopicPartition tp, long pollTimeoutMs) { + this(consumer, tp, null, null, pollTimeoutMs); + } + + /** + * @throws IllegalStateException if the kafka consumer poll call can not reach the target offset. + * @return true if has more records to be consumed. + */ + @Override public boolean hasNext() { /* Poll more records from Kafka queue IF: - Initial poll case -> (records == null) + Initial poll -> (records == null) OR - Need to poll at least one more record (currentOffset + 1 < endOffset) AND consumerRecordIterator is empty (!hasMore) + Need to poll at least one more record (consumerPosition < endOffset) AND consumerRecordIterator is empty (!hasMore) */ - if (!hasMore && currentOffset + 1 < endOffset || records == null) { + if (!hasMore && consumerPosition < endOffset || records == null) { pollRecords(); findNext(); } @@ -145,65 +156,49 @@ Need to poll at least one more record (currentOffset + 1 < endOffset) AND consum } /** - * Poll more records or Fail with {@link TimeoutException} if no records returned before reaching target end offset. + * Poll more records from the Kafka Broker. + * + * @throws IllegalStateException if no records returned before consumer position reaches target end offset. */ private void pollRecords() { if (LOG.isTraceEnabled()) { stopwatch.reset().start(); } - Preconditions.checkArgument(started); records = consumer.poll(pollTimeoutMs); if (LOG.isTraceEnabled()) { stopwatch.stop(); LOG.trace("Pulled [{}] records in [{}] ms", records.count(), stopwatch.elapsed(TimeUnit.MILLISECONDS)); } // Fail if we can not poll within one lap of pollTimeoutMs. - if (records.isEmpty() && currentOffset < endOffset) { - throw new TimeoutException(String.format("Current offset: [%s]-TopicPartition:[%s], target End offset:[%s]." - + "Consumer returned 0 record due to exhausted poll timeout [%s]ms, try increasing[%s]", - currentOffset, - topicPartition.toString(), - endOffset, + if (records.isEmpty() && consumer.position(topicPartition) < endOffset) { + throw new IllegalStateException(String.format(ERROR_POLL_TIMEOUT_FORMAT, pollTimeoutMs, - KafkaStreamingUtils.HIVE_KAFKA_POLL_TIMEOUT)); + topicPartition.toString(), + startOffset, + consumer.position(topicPartition), + endOffset)); } consumerRecordIterator = records.iterator(); + consumerPosition = consumer.position(topicPartition); } @Override public ConsumerRecord next() { ConsumerRecord value = nextRecord; Preconditions.checkState(value.offset() < endOffset); findNext(); - return Preconditions.checkNotNull(value); + return value; } /** - * Find the next element in the batch of returned records by previous poll or set hasMore to false tp poll more next - * call to {@link KafkaRecordIterator#hasNext()}. + * Find the next element in the current batch OR schedule {@link KafkaRecordIterator#pollRecords()} (hasMore = false). */ private void findNext() { if (consumerRecordIterator.hasNext()) { nextRecord = consumerRecordIterator.next(); - hasMore = true; - if (nextRecord.offset() < endOffset) { - currentOffset = nextRecord.offset(); - return; - } - } - hasMore = false; - nextRecord = null; - } - - /** - * Empty iterator for empty splits when startOffset == endOffset, this is added to avoid clumsy if condition. - */ - protected static final class EmptyIterator implements Iterator> { - @Override public boolean hasNext() { - return false; - } - - @Override public ConsumerRecord next() { - throw new IllegalStateException("this is an empty iterator"); + hasMore = nextRecord.offset() < endOffset; + } else { + hasMore = false; + nextRecord = null; } } } diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java index 98a5568f91..e048fb3197 100644 --- kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java @@ -38,7 +38,6 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.Time; import org.junit.After; @@ -192,9 +191,13 @@ public KafkaRecordIteratorTest() { recordReader.close(); } - @Test(expected = TimeoutException.class) public void testPullingBeyondLimit() { - this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 0L, 101L, POLL_TIMEOUT_MS); - this.compareIterator(RECORDS, this.kafkaRecordIterator); + @Test(expected = IllegalStateException.class) public void testPullingBeyondLimit() { + //FYI In the Tx world Commits can introduce offset gaps therefore + //this (RECORD_NUMBER + 1) as beyond limit offset is only true if the topic has not Tx or any Control msg. + this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 19383L, (long) RECORD_NUMBER + 1, POLL_TIMEOUT_MS); + this.compareIterator(RECORDS.stream() + .filter((consumerRecord) -> consumerRecord.offset() >= 19383L) + .collect(Collectors.toList()), this.kafkaRecordIterator); } @Test(expected = IllegalStateException.class) public void testPullingStartGreaterThanEnd() { @@ -202,13 +205,13 @@ public KafkaRecordIteratorTest() { this.compareIterator(RECORDS, this.kafkaRecordIterator); } - @Test(expected = TimeoutException.class) public void testPullingFromEmptyTopic() { + @Test(expected = IllegalStateException.class) public void testPullingFromEmptyTopic() { this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, new TopicPartition("noHere", 0), 0L, 100L, POLL_TIMEOUT_MS); this.compareIterator(RECORDS, this.kafkaRecordIterator); } - @Test(expected = TimeoutException.class) public void testPullingFromEmptyPartition() { + @Test(expected = IllegalStateException.class) public void testPullingFromEmptyPartition() { this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, new TopicPartition(TOPIC, 1), 0L, 100L, POLL_TIMEOUT_MS); this.compareIterator(RECORDS, this.kafkaRecordIterator); @@ -268,18 +271,19 @@ private void setupConsumer() { consumerProps.setProperty("enable.auto.commit", "false"); consumerProps.setProperty("auto.offset.reset", "none"); consumerProps.setProperty("bootstrap.servers", "127.0.0.1:9092"); - this.conf.set("kafka.bootstrap.servers", "127.0.0.1:9092"); + conf.set("kafka.bootstrap.servers", "127.0.0.1:9092"); consumerProps.setProperty("key.deserializer", ByteArrayDeserializer.class.getName()); consumerProps.setProperty("value.deserializer", ByteArrayDeserializer.class.getName()); consumerProps.setProperty("request.timeout.ms", "3002"); consumerProps.setProperty("fetch.max.wait.ms", "3001"); consumerProps.setProperty("session.timeout.ms", "3001"); consumerProps.setProperty("metadata.max.age.ms", "100"); + consumerProps.setProperty("max.poll.records", String.valueOf(RECORD_NUMBER - 1)); this.consumer = new KafkaConsumer<>(consumerProps); } private static void sendData() { - LOG.info("Sending {} records", RECORD_NUMBER); + LOG.info("Sending [{}] records", RECORDS.size()); RECORDS.stream() .map(consumerRecord -> new ProducerRecord<>(consumerRecord.topic(), consumerRecord.partition(),