diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index 45db07b..2ebd72a 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -125,12 +125,6 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( /** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */ val consumerTimeoutMs = props.getInt("consumer.timeout.ms", ConsumerTimeoutMs) - /** Use shallow iterator over compressed messages directly. This feature should be used very carefully. - * Typically, it's only used for mirroring raw messages from one kafka cluster to another to save the - * overhead of decompression. - * */ - val shallowIteratorEnable = props.getBoolean("shallow.iterator.enable", false) - /** * Client id is specified by the kafka consumer client, used to distinguish different clients */ diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index 746a4bd..a504534 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -34,7 +34,6 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk consumerTimeoutMs: Int, private val keyDecoder: Decoder[K], private val valueDecoder: Decoder[V], - val enableShallowIterator: Boolean, val clientId: String) extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging { @@ -83,11 +82,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo)) currentTopicInfo.resetConsumeOffset(cdcFetchOffset) } - localCurrent = - if (enableShallowIterator) - currentDataChunk.messages.shallowIterator - else - currentDataChunk.messages.iterator + localCurrent = currentDataChunk.messages.iterator current.set(localCurrent) } diff --git a/core/src/main/scala/kafka/consumer/KafkaStream.scala b/core/src/main/scala/kafka/consumer/KafkaStream.scala index d4e0e96..31eaf86 100644 --- a/core/src/main/scala/kafka/consumer/KafkaStream.scala +++ b/core/src/main/scala/kafka/consumer/KafkaStream.scala @@ -26,12 +26,11 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int, private val keyDecoder: Decoder[K], private val valueDecoder: Decoder[V], - val enableShallowIterator: Boolean, val clientId: String) extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] { private val iter: ConsumerIterator[K,V] = - new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator, clientId) + new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, clientId) /** * Create an iterator over messages in the stream. diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index b266f3f..dcbcf21 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -195,7 +195,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, threadIdSet.map(_ => { val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) val stream = new KafkaStream[K,V]( - queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.shallowIteratorEnable, config.clientId) + queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId) (queue, stream) }) ).flatten.toList @@ -695,7 +695,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, config.consumerTimeoutMs, keyDecoder, valueDecoder, - config.shallowIteratorEnable, config.clientId) (queue, stream) }).toList diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index 8ae30ea..1ee34b9 100644 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -78,7 +78,6 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { consumerConfig.consumerTimeoutMs, new StringDecoder(), new StringDecoder(), - enableShallowIterator = false, clientId = "") val receivedMessages = (0 until 5).map(i => iter.next.message).toList