Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26549

PySpark worker reuse take no effect for parallelize xrange

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 3.0.0
    • Fix Version/s: 3.0.0
    • Component/s: PySpark
    • Labels:
      None

      Description

      During the follow-up work for PySpark worker reuse scenario, we found that the worker reuse takes no effect for `sc.parallelize(xrange(...))`.
      It happened because of the specialize rdd.parallelize for xrange(SPARK-4398) generated data by xrange, which don't need to use the passed-in iterator. But this will break the end of stream checking in python worker and finally cause worker reuse takes no effect.

      Relative code block and more details listing below:
      Current specialize logic of xrange don't need the passed-in iterator, context.py:

      if isinstance(c, xrange):
          ...
          def f(split, iterator):
              return xrange(getStart(split), getStart(split + 1), step)
          ...
          return self.parallelize([], numSlices).mapPartitionsWithIndex(f)
      

      We got an unexpected value -1 which refers to END_OF_DATA_SECTION while check end of stream. See the code in worker.py:

      # check end of stream
      if read_int(infile) == SpecialLengths.END_OF_STREAM:
          write_int(SpecialLengths.END_OF_STREAM, outfile)
      else:
          # write a different value to tell JVM to not reuse this worker
          write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
          sys.exit(-1)
      

      The code works well for parallelize(range) because the END_OF_DATA_SECTION has been handled during load iterator from the socket stream, see the code in FramedSerializer:

      def load_stream(self, stream):
          while True:
              try:
                  yield self._read_with_length(stream)
              except EOFError:
                  return
      ...
      def _read_with_length(self, stream):
          length = read_int(stream)
          if length == SpecialLengths.END_OF_DATA_SECTION:
              raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in load_stream
          elif length == SpecialLengths.NULL:
              return None
          obj = stream.read(length)
          if len(obj) < length:
              raise EOFError
          return self.loads(obj)
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                XuanYuan Yuanjian Li
                Reporter:
                XuanYuan Yuanjian Li
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: