Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23961

pyspark toLocalIterator throws an exception

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 2.0.2, 2.1.2, 2.2.1, 2.3.0
    • Fix Version/s: 3.0.0
    • Component/s: PySpark
    • Labels:

      Description

      Given a dataframe and use toLocalIterator. If we do not consume all records, it will throw: 

      ERROR PythonRDD: Error while sending iterator
      java.net.SocketException: Connection reset by peer: socket write error
      at java.net.SocketOutputStream.socketWrite0(Native Method)
      at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
      at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
      at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
      at java.io.DataOutputStream.write(DataOutputStream.java:107)
      at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
      at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:497)
      at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509)
      at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
      at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:705)
      at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705)
      at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705)
      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
      at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:706)

       

      To reproduce, here is a simple pyspark shell script that show the error:

      import itertools
      df = spark.read.parquet("large parquet folder").cache()
      print(df.count())
      b = df.toLocalIterator()
      print(len(list(itertools.islice(b, 20))))
      b = None # Make the iterator goes out of scope.  Throws here.

       

      Observations:

      • Consuming all records do not throw.  Taking only a subset of the partitions create the error.
      • In another experiment, doing the same on a regular RDD works if we cache/materialize it. If we do not cache the RDD, it throws similarly.
      • It works in scala shell

       

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                bryanc Bryan Cutler
                Reporter:
                FlamingMike Michel Lemay
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: