Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-29017

JobGroup and LocalProperty not respected by PySpark

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 2.4.4
    • None
    • PySpark
    • None

    Description

      Pyspark has setJobGroup and setLocalProperty methods, which are intended to set properties which only effect the calling thread. They try to do this my calling the equivalent JVM functions via Py4J.

      However, there is nothing ensuring that subsequent py4j calls from a python thread call into the same thread in java. In effect, this means this methods might appear to work some of the time, if you happen to get lucky and get the same thread on the java side. But then sometimes it won't work, and in fact its less likely to work if there are multiple threads in python submitting jobs.

      I think the right way to fix this is to keep a python thread-local tracking these properties, and then sending them through to the JVM on calls to submitJob. This is going to be a headache to get right, though; we've also got to handle implicit calls, eg. rdd.collect(), rdd.forEach(), etc. And of course users may have defined their own functions, which will be broken until they fix it to use the same thread-locals.

      An alternative might be to use what py4j calls the "Single Threading Model" (https://www.py4j.org/advanced_topics.html#the-single-threading-model). I'd want to look more closely at the py4j implementation of how that works first.

      I can't think of any guaranteed workaround, but I think you could increase your chances of getting the desired behavior if you always set those properties just before each call to runJob. Eg., instead of

      # more likely to trigger bug this way
      sc.setJobGroup("a")
      
      rdd1.collect()  # or whatever other ways you submit a job
      rdd2.collect()
      # lots more stuff ...
      rddN.collect()
      

      change it to

      # slightly safer, but still no guarantees
      
      sc.setJobGroup("a")
      rdd1.collect()  # or whatever other ways you submit a job
      sc.setJobGroup("a")
      rdd2.collect()
      # lots more stuff ...
      sc.setJobGroup("a")
      rddN.collect()
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              irashid Imran Rashid
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: