Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-12002

offsetRanges attribute missing in Kafka RDD when resuming from checkpoint

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.6.0
    • Component/s: DStreams, PySpark
    • Labels:
      None

      Description

      SPARK-8389 added offsetRanges to Kafka direct streams. And SPARK-10122 fixed the issue of not ending up with non-Kafka RDDs when chaining transforms to Kafka RDDs. It appears that this issue remains for the case where a streaming application using Kafka direct streams is initialized from the checkpoint directory. The following is a representative example where everything works as expected during the first run, but exceptions are thrown on a subsequent run when the context is being initialized from the checkpoint directory.

      test_checkpoint.py
      from pyspark import SparkContext                                                                                            
      from pyspark.streaming import StreamingContext                                                                              
      from pyspark.streaming.kafka import KafkaUtils                                                                              
      
      
      def attach_kafka_metadata(kafka_rdd):                                                                                       
          offset_ranges = kafka_rdd.offsetRanges()                                                                                
                                                                                                                                  
          return kafka_rdd                                                                                                        
                                                                                                                                  
                                                                                                                                  
      def create_context():                                                                                                       
          sc = SparkContext(appName='kafka-test')                                                                                 
          ssc = StreamingContext(sc, 10)                                                                                          
          ssc.checkpoint(CHECKPOINT_URI)                                                                                          
                                                                                                                                  
          kafka_stream = KafkaUtils.createDirectStream(                                                                           
              ssc,                                                                                                                
              [TOPIC],                                                                                                            
              kafkaParams={                                                                                                       
                  'metadata.broker.list': BROKERS,                                                                                
              },                                                                                                                  
          )                                                                                                                       
          kafka_stream.transform(attach_kafka_metadata).count().pprint()                                                          
                                                                                                                                  
          return ssc                                                                                                              
                                                                                                                                  
                                                                                                                                  
      if __name__ == "__main__":                                                                                                  
          ssc = StreamingContext.getOrCreate(CHECKPOINT_URI, create_context)                                                      
          ssc.start()                                                                                                             
          ssc.awaitTermination()
      
      Exception on resuming from checkpoint
      Traceback (most recent call last):
        File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 62, in call
          r = self.func(t, *rdds)
        File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 344, in <lambda>
        File "/home/spark/batch/test_checkpoint.py", line 12, in attach_kafka_metadata
          offset_ranges = kafka_rdd.offsetRanges()
      AttributeError: 'RDD' object has no attribute 'offsetRanges'
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                jerryshao Saisai Shao
                Reporter:
                aramesh Amit Ramesh
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: