Details
-
Improvement
-
Status: Closed
-
Major
-
Resolution: Not A Bug
-
2.0.2
-
None
-
None
Description
I apologize for the earlier descripion which wasnt very clear about the issue. I would give a detailed description and my usecase now -
I have a spark application running which is consuming kafka messages using Spark Kafka 0.10 integration. I now need to stop my spark application and the user would then tell what timestamp in the past the spark application should start reading messages from (replaying messages). The timestamp is mapped to kafka offset by using the 'offsetsForTimes' API in KafkaConsumer introduced in 10.1.0 client of Kafka. That offset is then used to create DStream
Because Kafka 10.0.1 des not have API 'offsetsForTimes' I need to use Kafka 10.1.0.
So to achieve that behavior I replaced the 10.0.1 jar in Spark environment with 10.1.0 jar. Things started working for me but the application could read only messages from the first partition.
To recreate the issue I wrote a local program and had 10.1.0 jar in the classpath
********************************
val topics = Set("Z1Topic")
val topicPartitionOffsetMap = new HashMap[TopicPartition, Long]()
topicPartitionOffsetMap.put(new TopicPartition("Z1Topic",0), 10L) //hardcoded offset to 10 instead of getting the offset from 'offsetsForTimes'
topicPartitionOffsetMap.put(new TopicPartition("Z1Topic",1), 10L)
import scala.collection.JavaConversions._
val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferBrokers, Subscribe[String, String](topics, kafkaParams, topicPartitionOffsetMap))
val x = stream.map(x => x.value())
x.print()
********************************
This printed only the messages in the first topic from offset 10. (This is with 10.1.0 client)
If I am to use Kafka 10.0.1 client for the above program, things work fine and I receive messages from all partitions but I cant use the 'offsetsForTimes' API (because it doesnt exist in 10.0.1 client).
Attachments
Issue Links
- is related to
-
SPARK-18057 Update structured streaming kafka from 0.10.0.1 to 2.0.0
- Resolved
- links to