Details
Description
Hi,
The following snippet code runs 4-5 times slower when it's used in Apache Spark or PySpark 3.1.1 compare to Apache Spark or PySpark 3.0.2:
spark = SparkSession.builder \ .master("local[*]") \ .config("spark.driver.memory", "16G") \ .config("spark.driver.maxResultSize", "0") \ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ .config("spark.kryoserializer.buffer.max", "2000m") \ .getOrCreate() Toys = spark.read \ .parquet('./toys-cleaned').repartition(12) # tokenize the text regexTokenizer = RegexTokenizer(inputCol="reviewText", outputCol="all_words", pattern="\\W") toys_with_words = regexTokenizer.transform(Toys) # remove stop words remover = StopWordsRemover(inputCol="all_words", outputCol="words") toys_with_tokens = remover.transform(toys_with_words).drop("all_words") all_words = toys_with_tokens.select(explode("words").alias("word")) # group by, sort and limit to 50k top50k = all_words.groupBy("word").agg(count("*").alias("total")).sort(col("total").desc()).limit(50000) top50k.show()
Some debugging on my side revealed that in Spark/PySpark 3.0.2 the 12 partitions are respected in a way that all 12 tasks are being processed altogether. However, in Spark/PySpark 3.1.1 even though we have 12 tasks, 10 of them finish immediately and only 2 are being processed. (I've tried to disable a couple of configs related to something similar, but none of them worked)
Screenshot of spark 3.1.1 task:
Screenshot of spark 3.0.2 task:
For a longer discussion: Spark User List
You can reproduce this big difference of performance between Spark 3.1.1 and Spark 3.0.2 by using the shared code with any dataset that is large enough to take longer than a minute. Not sure if this is related to SQL, any Spark config being enabled in 3.x but not really into action before 3.1.1, or it's about .transform in Spark ML.