Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23961

pyspark toLocalIterator throws an exception

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.0.2, 2.1.2, 2.2.1, 2.3.0
    • 3.0.0
    • PySpark

    Description

      Given a dataframe and use toLocalIterator. If we do not consume all records, it will throw: 

      ERROR PythonRDD: Error while sending iterator
      java.net.SocketException: Connection reset by peer: socket write error
      at java.net.SocketOutputStream.socketWrite0(Native Method)
      at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
      at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
      at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
      at java.io.DataOutputStream.write(DataOutputStream.java:107)
      at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
      at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:497)
      at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509)
      at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
      at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:705)
      at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705)
      at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705)
      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
      at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:706)

       

      To reproduce, here is a simple pyspark shell script that show the error:

      import itertools
      df = spark.read.parquet("large parquet folder").cache()
      print(df.count())
      b = df.toLocalIterator()
      print(len(list(itertools.islice(b, 20))))
      b = None # Make the iterator goes out of scope.  Throws here.

       

      Observations:

      • Consuming all records do not throw.  Taking only a subset of the partitions create the error.
      • In another experiment, doing the same on a regular RDD works if we cache/materialize it. If we do not cache the RDD, it throws similarly.
      • It works in scala shell

       

      Attachments

        Issue Links

          Activity

            People

              bryanc Bryan Cutler
              FlamingMike Michel Lemay
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: