Description
SPARK-8389 added the offsetRanges interface to Kafka direct streams. This however appears to break when chaining operations after a transform operation. Following is example code that would result in an error (stack trace below). Note that if the 'count()' operation is taken out of the example code then this error does not occur anymore, and the Kafka data is printed.
kafka_test.py
from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils def attach_kafka_metadata(kafka_rdd): offset_ranges = kafka_rdd.offsetRanges() return kafka_rdd if __name__ == "__main__": sc = SparkContext(appName='kafka-test') ssc = StreamingContext(sc, 10) kafka_stream = KafkaUtils.createDirectStream( ssc, [TOPIC], kafkaParams={ 'metadata.broker.list': BROKERS, }, ) kafka_stream.transform(attach_kafka_metadata).count().pprint() ssc.start() ssc.awaitTermination()
Stack trace
Traceback (most recent call last): File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 62, in call r = self.func(t, *rdds) File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 616, in <lambda> self.func = lambda t, rdd: func(t, prev_func(t, rdd)) File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 616, in <lambda> self.func = lambda t, rdd: func(t, prev_func(t, rdd)) File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 616, in <lambda> self.func = lambda t, rdd: func(t, prev_func(t, rdd)) File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 616, in <lambda> self.func = lambda t, rdd: func(t, prev_func(t, rdd)) File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 332, in <lambda> func = lambda t, rdd: oldfunc(rdd) File "/home/spark/ad_realtime/batch/kafka_test.py", line 7, in attach_kafka_metadata offset_ranges = kafka_rdd.offsetRanges() AttributeError: 'RDD' object has no attribute 'offsetRanges'
Attachments
Issue Links
- is related to
-
SPARK-8389 Expose KafkaRDDs offsetRange in Python
- Resolved
- relates to
-
SPARK-12002 offsetRanges attribute missing in Kafka RDD when resuming from checkpoint
- Resolved
- links to