Description
During the follow-up work for PySpark worker reuse scenario, we found that the worker reuse takes no effect for `sc.parallelize(xrange(...))`.
It happened because of the specialize rdd.parallelize for xrange(SPARK-4398) generated data by xrange, which don't need to use the passed-in iterator. But this will break the end of stream checking in python worker and finally cause worker reuse takes no effect.
Relative code block and more details listing below:
Current specialize logic of xrange don't need the passed-in iterator, context.py:
if isinstance(c, xrange): ... def f(split, iterator): return xrange(getStart(split), getStart(split + 1), step) ... return self.parallelize([], numSlices).mapPartitionsWithIndex(f)
We got an unexpected value -1 which refers to END_OF_DATA_SECTION while check end of stream. See the code in worker.py:
# check end of stream if read_int(infile) == SpecialLengths.END_OF_STREAM: write_int(SpecialLengths.END_OF_STREAM, outfile) else: # write a different value to tell JVM to not reuse this worker write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) sys.exit(-1)
The code works well for parallelize(range) because the END_OF_DATA_SECTION has been handled during load iterator from the socket stream, see the code in FramedSerializer:
def load_stream(self, stream): while True: try: yield self._read_with_length(stream) except EOFError: return ... def _read_with_length(self, stream): length = read_int(stream) if length == SpecialLengths.END_OF_DATA_SECTION: raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in load_stream elif length == SpecialLengths.NULL: return None obj = stream.read(length) if len(obj) < length: raise EOFError return self.loads(obj)
Attachments
Issue Links
- is related to
-
SPARK-26573 Python worker not reused with mapPartitions if not consuming iterator
- Resolved
- links to