Details
Description
I'm using udf filters and accumulators to keep track of filtered rows in dataframes.
If I'm applying multiple filters one after the other, they seem to be executed in parallel, not in sequence, which messes with the accumulators i'm using to keep track of filtered data.
example.py
from pyspark.sql.functions import udf, col from pyspark.sql.types import BooleanType from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() sc = spark.sparkContext df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", "val1", "val2"]) def __myfilter(val, acc): if val < 2: return True else: acc.add(1) return False acc1 = sc.accumulator(0) acc2 = sc.accumulator(0) def myfilter1(val): return __myfilter(val, acc1) def myfilter2(val): return __myfilter(val, acc2) my_udf1 = udf(myfilter1, BooleanType()) my_udf2 = udf(myfilter2, BooleanType()) df.show() # +---+----+----+ # |key|val1|val2| # +---+----+----+ # | a| 1| 1| # | b| 2| 2| # | c| 3| 3| # +---+----+----+ df = df.filter(my_udf1(col("val1"))) # df.show() # +---+----+----+ # |key|val1|val2| # +---+----+----+ # | a| 1| 1| # +---+----+----+ # expected acc1: 2 # expected acc2: 0 df = df.filter(my_udf2(col("val2"))) # df.show() # +---+----+----+ # |key|val1|val2| # +---+----+----+ # | a| 1| 1| # +---+----+----+ # expected acc1: 2 # expected acc2: 0 df.show() print("acc1: %s" % acc1.value) # expected 2, is 2 OK print("acc2: %s" % acc2.value) # expected 0, is 2 !!!