Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22541

Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators

    XMLWordPrintableJSON

    Details

    • Type: Documentation
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.2.0
    • Fix Version/s: 2.3.0
    • Component/s: PySpark
    • Labels:
      None
    • Environment:

      pyspark 2.2.0, ubuntu

      Description

      I'm using udf filters and accumulators to keep track of filtered rows in dataframes.

      If I'm applying multiple filters one after the other, they seem to be executed in parallel, not in sequence, which messes with the accumulators i'm using to keep track of filtered data.

      example.py
      from pyspark.sql.functions import udf, col
      from pyspark.sql.types import BooleanType
      from pyspark.sql import SparkSession
      
      spark = SparkSession.builder.getOrCreate()
      sc = spark.sparkContext
      df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", "val1", "val2"])
      
      def __myfilter(val, acc):
          if val < 2:
              return True
          else:
              acc.add(1)
          return False
      
      acc1 = sc.accumulator(0)
      acc2 = sc.accumulator(0)
      
      def myfilter1(val):
          return __myfilter(val, acc1)
      
      def myfilter2(val):
          return __myfilter(val, acc2)
      
      my_udf1 = udf(myfilter1, BooleanType())
      my_udf2 = udf(myfilter2, BooleanType())
      
      df.show()
      # +---+----+----+
      # |key|val1|val2|
      # +---+----+----+
      # |  a|   1|   1|
      # |  b|   2|   2|
      # |  c|   3|   3|
      # +---+----+----+
      
      df = df.filter(my_udf1(col("val1")))
      # df.show()
      # +---+----+----+
      # |key|val1|val2|
      # +---+----+----+
      # |  a|   1|   1|
      # +---+----+----+
      # expected acc1: 2
      # expected acc2: 0
      
      df = df.filter(my_udf2(col("val2")))
      # df.show()
      # +---+----+----+
      # |key|val1|val2|
      # +---+----+----+
      # |  a|   1|   1|
      # +---+----+----+
      # expected acc1: 2
      # expected acc2: 0
      df.show()
      print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
      print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
      

        Attachments

          Activity

            People

            • Assignee:
              viirya Liang-Chi Hsieh
              Reporter:
              jko Janne K. Olesen
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: