Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-17110

Pyspark with locality ANY throw java.io.StreamCorruptedException

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 2.0.0
    • 2.0.1, 2.1.0
    • PySpark
    • None
    • Cluster of 2 AWS r3.xlarge slaves launched via ec2 scripts, Spark 2.0.0, hadoop: yarn, pyspark shell

    Description

      In Pyspark 2.0.0, any task that accesses cached data non-locally throws a StreamCorruptedException like the stacktrace below:

      WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): java.io.StreamCorruptedException: invalid stream header: 12010A80
              at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807)
              at java.io.ObjectInputStream.<init>(ObjectInputStream.java:302)
              at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63)
              at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63)
              at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
              at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
              at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524)
              at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522)
              at scala.Option.map(Option.scala:146)
              at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522)
              at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609)
              at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661)
              at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
              at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
              at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
              at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
              at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
              at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
              at org.apache.spark.scheduler.Task.run(Task.scala:85)
              at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
              at java.lang.Thread.run(Thread.java:745)
      

      The simplest way I have found to reproduce this is by running the following code in the pyspark shell, on a cluster of 2 slaves set to use only one worker core each:

      x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
      x.count()
      
      import time
      def waitMap(x):
          time.sleep(x)
          return x
      
      x.map(waitMap).count()
      

      Or by running the following via spark-submit:

      from pyspark import SparkContext
      sc = SparkContext()
      
      x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache()
      x.count()
      
      import time
      def waitMap(x):
          time.sleep(x)
          return x
      
      x.map(waitMap).count()
      

      Attachments

        Activity

          People

            joshrosen Josh Rosen
            tomerk Tomer Kaftan
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: