Description
SPARK-8389 added offsetRanges to Kafka direct streams. And SPARK-10122 fixed the issue of not ending up with non-Kafka RDDs when chaining transforms to Kafka RDDs. It appears that this issue remains for the case where a streaming application using Kafka direct streams is initialized from the checkpoint directory. The following is a representative example where everything works as expected during the first run, but exceptions are thrown on a subsequent run when the context is being initialized from the checkpoint directory.
from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils def attach_kafka_metadata(kafka_rdd): offset_ranges = kafka_rdd.offsetRanges() return kafka_rdd def create_context(): sc = SparkContext(appName='kafka-test') ssc = StreamingContext(sc, 10) ssc.checkpoint(CHECKPOINT_URI) kafka_stream = KafkaUtils.createDirectStream( ssc, [TOPIC], kafkaParams={ 'metadata.broker.list': BROKERS, }, ) kafka_stream.transform(attach_kafka_metadata).count().pprint() return ssc if __name__ == "__main__": ssc = StreamingContext.getOrCreate(CHECKPOINT_URI, create_context) ssc.start() ssc.awaitTermination()
Traceback (most recent call last): File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 62, in call r = self.func(t, *rdds) File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 344, in <lambda> File "/home/spark/batch/test_checkpoint.py", line 12, in attach_kafka_metadata offset_ranges = kafka_rdd.offsetRanges() AttributeError: 'RDD' object has no attribute 'offsetRanges'
Attachments
Issue Links
- is related to
-
SPARK-8389 Expose KafkaRDDs offsetRange in Python
- Resolved
-
SPARK-10122 AttributeError: 'RDD' object has no attribute 'offsetRanges'
- Resolved
- links to