Details
Description
Pyspark's RDD.collect, as well as DataFrame.toLocalIterator and DataFrame.collect all work by starting an ephemeral server in the driver, and having Python connect to it to download the data.
All three are implemented along the lines of:
port = self._jdf.collectToPython()
return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
The server has *a hardcoded timeout of 3 seconds* (https://github.com/apache/spark/blob/e26dac5feb02033f980b1e69c9b0ff50869b6f9e/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L695) – i.e., the Python process has 3 seconds to connect to it from the very moment the driver server starts.
In general, that seems fine, but I have been encountering frequent timeouts leading to `Exception: could not open socket`.
After investigating a bit, it turns out that _load_from_socket makes a call to getaddrinfo:
def _load_from_socket(port, serializer): sock = None # Support for both IPv4 and IPv6. # On most of IPv6-ready systems, IPv6 will take precedence. for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM): .. connect ..
I am not sure why, but while most such calls to getaddrinfo on my machine only take a couple milliseconds, about 10% of them take between 2 and 10 seconds, leading to about 10% of jobs failing. I don't think we can always expect getaddrinfo to return instantly. More generally, Python may sometimes pause for a couple seconds, which may not leave enough time for the process to connect to the server.
Especially since the server timeout is hardcoded, I think it would be best to set a rather generous value (15 seconds?) to avoid such situations.
A getaddrinfo specific fix could avoid doing it every single time, or do it before starting up the driver server.
Attachments
Issue Links
- is related to
-
SPARK-18649 sc.textFile(my_file).collect() raises socket.timeout on large files
- Resolved
- links to