Details
Description
In Pyspark 2.0.0, any task that accesses cached data non-locally throws a StreamCorruptedException like the stacktrace below:
WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 26, 172.31.26.184): java.io.StreamCorruptedException: invalid stream header: 12010A80 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:807) at java.io.ObjectInputStream.<init>(ObjectInputStream.java:302) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522) at scala.Option.map(Option.scala:146) at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
The simplest way I have found to reproduce this is by running the following code in the pyspark shell, on a cluster of 2 slaves set to use only one worker core each:
x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() x.count() import time def waitMap(x): time.sleep(x) return x x.map(waitMap).count()
Or by running the following via spark-submit:
from pyspark import SparkContext sc = SparkContext() x = sc.parallelize([1, 1, 1, 1, 1, 1000, 1, 1, 1], numSlices=9).cache() x.count() import time def waitMap(x): time.sleep(x) return x x.map(waitMap).count()