Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
None
-
None
-
PySpark version 1.6.2
Description
I'm trying to load a file into the driver with this code:
contents = sc.textFile('hdfs://path/to/big_file.csv').collect()
Loading into the driver instead of creating a distributed RDD is intentional in this case. The file is ca. 6GB, and I have adjusted driver memory accordingly to fit the local data. After some time, my spark/submitted job crashes with the stack trace below.
I have traced this to pyspark/rdd.py where the _load_from_socket() method creates a socket with a hard-coded timeout of 3 seconds (this code is also present in HEAD although I'm on PySpark 1.6.2). Raising this hard-coded value to e.g. 600 lets me read the entire file.
Is there any reason that this value does not use e.g. the 'spark.network.timeout' setting instead?
Traceback (most recent call last):
File "my_textfile_test.py", line 119, in <module>
contents = sc.textFile('hdfs://path/to/file.csv').collect()
File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 772, in collect
File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 142, in _load_from_socket
File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 517, in load_stream
File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 511, in loads
File "/usr/lib/python2.7/socket.py", line 380, in read
data = self._sock.recv(left)
socket.timeout: timed out
16/11/30 13:33:14 WARN Utils: Suppressing exception in finally: Broken pipe
java.net.SocketException: Broken pipe
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$2.apply$mcV$sp(PythonRDD.scala:650)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1248)
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:649)
Suppressed: java.net.SocketException: Broken pipe
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
... 3 more
16/11/30 13:33:14 ERROR PythonRDD: Error while sending iterator
java.net.SocketException: Connection reset
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:648)
at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:648)
at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:648)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1239)
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:649)
Suppressed: java.net.SocketException: Broken pipe
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$2.apply$mcV$sp(PythonRDD.scala:650)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1248)
... 1 more
Suppressed: java.net.SocketException: Broken pipe
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
... 3 more
Attachments
Issue Links
- relates to
-
SPARK-21551 pyspark's collect fails when getaddrinfo is too slow
- Resolved
-
SPARK-18281 toLocalIterator yields time out error on pyspark2
- Resolved