Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22541

Dataframes: applying multiple filters one after another using udfs and accumulators results in faulty accumulators

    XMLWordPrintableJSON

Details

    • Documentation
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.2.0
    • 2.3.0
    • PySpark
    • None
    • pyspark 2.2.0, ubuntu

    Description

      I'm using udf filters and accumulators to keep track of filtered rows in dataframes.

      If I'm applying multiple filters one after the other, they seem to be executed in parallel, not in sequence, which messes with the accumulators i'm using to keep track of filtered data.

      example.py
      from pyspark.sql.functions import udf, col
      from pyspark.sql.types import BooleanType
      from pyspark.sql import SparkSession
      
      spark = SparkSession.builder.getOrCreate()
      sc = spark.sparkContext
      df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", "val1", "val2"])
      
      def __myfilter(val, acc):
          if val < 2:
              return True
          else:
              acc.add(1)
          return False
      
      acc1 = sc.accumulator(0)
      acc2 = sc.accumulator(0)
      
      def myfilter1(val):
          return __myfilter(val, acc1)
      
      def myfilter2(val):
          return __myfilter(val, acc2)
      
      my_udf1 = udf(myfilter1, BooleanType())
      my_udf2 = udf(myfilter2, BooleanType())
      
      df.show()
      # +---+----+----+
      # |key|val1|val2|
      # +---+----+----+
      # |  a|   1|   1|
      # |  b|   2|   2|
      # |  c|   3|   3|
      # +---+----+----+
      
      df = df.filter(my_udf1(col("val1")))
      # df.show()
      # +---+----+----+
      # |key|val1|val2|
      # +---+----+----+
      # |  a|   1|   1|
      # +---+----+----+
      # expected acc1: 2
      # expected acc2: 0
      
      df = df.filter(my_udf2(col("val2")))
      # df.show()
      # +---+----+----+
      # |key|val1|val2|
      # +---+----+----+
      # |  a|   1|   1|
      # +---+----+----+
      # expected acc1: 2
      # expected acc2: 0
      df.show()
      print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
      print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
      

      Attachments

        Activity

          People

            viirya L. C. Hsieh
            jko Janne K. Olesen
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: