Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Not A Problem
-
1.4.1
-
None
Description
I am trying to use wholeTextFiles with pyspark , and now I am getting the same error -
textFiles = sc.wholeTextFiles('/file/content') textFiles.take(1) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/Volumes/work/bigdata/CHD5.4/spark-1.4.1-bin-hadoop2.6/python/pyspark/rdd.py", line 1277, in take res = self.context.runJob(self, takeUpToNumLeft, p, True) File "/Volumes/work/bigdata/CHD5.4/spark-1.4.1-bin-hadoop2.6/python/pyspark/context.py", line 898, in runJob return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) File "/Volumes/work/bigdata/CHD5.4/spark-1.4.1-bin-hadoop2.6/python/pyspark/rdd.py", line 138, in _load_from_socket raise Exception("could not open socket") Exception: could not open socket >>> 15/08/24 20:09:27 ERROR PythonRDD: Error while sending iterator java.net.SocketTimeoutException: Accept timed out at java.net.PlainSocketImpl.socketAccept(Native Method) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:404) at java.net.ServerSocket.implAccept(ServerSocket.java:545) at java.net.ServerSocket.accept(ServerSocket.java:513) at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:623)
Current piece of code in rdd.py-
rdd.py
def _load_from_socket(port, serializer): sock = None # Support for both IPv4 and IPv6. # On most of IPv6-ready systems, IPv6 will take precedence. for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM): af, socktype, proto, canonname, sa = res try: sock = socket.socket(af, socktype, proto) sock.settimeout(3) sock.connect(sa) except socket.error: sock = None continue break if not sock: raise Exception("could not open socket") try: rf = sock.makefile("rb", 65536) for item in serializer.load_stream(rf): yield item finally: sock.close()
On further investigate the issue , i realized that in context.py , runJob is not actually triggering the server and so there is nothing to connect -
context.py
port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)