Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-18649

sc.textFile(my_file).collect() raises socket.timeout on large files

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • None
    • None
    • PySpark
    • PySpark version 1.6.2

    Description

      I'm trying to load a file into the driver with this code:

      contents = sc.textFile('hdfs://path/to/big_file.csv').collect()

      Loading into the driver instead of creating a distributed RDD is intentional in this case. The file is ca. 6GB, and I have adjusted driver memory accordingly to fit the local data. After some time, my spark/submitted job crashes with the stack trace below.

      I have traced this to pyspark/rdd.py where the _load_from_socket() method creates a socket with a hard-coded timeout of 3 seconds (this code is also present in HEAD although I'm on PySpark 1.6.2). Raising this hard-coded value to e.g. 600 lets me read the entire file.

      Is there any reason that this value does not use e.g. the 'spark.network.timeout' setting instead?

      Traceback (most recent call last):
      File "my_textfile_test.py", line 119, in <module>
      contents = sc.textFile('hdfs://path/to/file.csv').collect()
      File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 772, in collect
      File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 142, in _load_from_socket
      File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 517, in load_stream
      File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 511, in loads
      File "/usr/lib/python2.7/socket.py", line 380, in read
      data = self._sock.recv(left)
      socket.timeout: timed out
      16/11/30 13:33:14 WARN Utils: Suppressing exception in finally: Broken pipe
      java.net.SocketException: Broken pipe
      at java.net.SocketOutputStream.socketWrite0(Native Method)
      at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
      at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
      at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
      at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
      at java.io.DataOutputStream.flush(DataOutputStream.java:123)
      at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
      at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$2.apply$mcV$sp(PythonRDD.scala:650)
      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1248)
      at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:649)
      Suppressed: java.net.SocketException: Broken pipe
      at java.net.SocketOutputStream.socketWrite0(Native Method)
      at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
      at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
      at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
      at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
      at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
      at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
      ... 3 more
      16/11/30 13:33:14 ERROR PythonRDD: Error while sending iterator
      java.net.SocketException: Connection reset
      at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
      at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
      at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
      at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
      at java.io.DataOutputStream.write(DataOutputStream.java:107)
      at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
      at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
      at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
      at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
      at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
      at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
      at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
      at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:648)
      at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:648)
      at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:648)
      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1239)
      at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:649)
      Suppressed: java.net.SocketException: Broken pipe
      at java.net.SocketOutputStream.socketWrite0(Native Method)
      at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
      at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
      at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
      at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
      at java.io.DataOutputStream.flush(DataOutputStream.java:123)
      at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
      at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$2.apply$mcV$sp(PythonRDD.scala:650)
      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1248)
      ... 1 more
      Suppressed: java.net.SocketException: Broken pipe
      at java.net.SocketOutputStream.socketWrite0(Native Method)
      at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
      at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
      at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
      at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
      at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
      at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
      ... 3 more

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              erikcederstrand Erik Cederstrand
              Votes:
              2 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: