Description
Consider the following test case. We create a dataframe with 100 withColumn statements, then 100 more, then 100 more, then 100 more. Each time we do this it gets slower pretty drastically. If we sub in the optimized plan, we end up with drastically better performance.
Consider the following code:
val raw = sc.parallelize(Range(1, 100)).toDF val s1 = System.nanoTime() var mapped = Range(1, 100).foldLeft(raw) { (df, i) => df.withColumn(s"value${i}", df("value") + i) } val s2 = System.nanoTime() val mapped2 = Range(1, 100).foldLeft(mapped) { (df, i) => df.withColumn(s"value${i}_2", df("value") + i) } val s3 = System.nanoTime() val mapped3 = Range(1, 100).foldLeft(mapped2) { (df, i) => df.withColumn(s"value${i}_3", df("value") + i) } val s4 = System.nanoTime() val mapped4 = Range(1, 100).foldLeft(mapped3) { (df, i) => df.withColumn(s"value${i}_4", df("value") + i) } val s5 = System.nanoTime() val plan = mapped3.queryExecution.optimizedPlan val optimizedMapped3 = new org.apache.spark.sql.DataFrame(spark, plan, org.apache.spark.sql.catalyst.encoders.RowEncoder(mapped3.schema)) val s6 = System.nanoTime() val mapped5 = Range(1, 100).foldLeft(optimizedMapped3) { (df, i) => df.withColumn(s"value${i}_4", df("value") + i) } val s7 = System.nanoTime() val mapped6 = Range(1, 100).foldLeft(mapped3) { (df, i) => df.withColumn(s"value${i}_4", df("value") + i) } val s8 = System.nanoTime() val plan = mapped3.queryExecution.analyzed val analyzedMapped4 = new org.apache.spark.sql.DataFrame(spark, plan, org.apache.spark.sql.catalyst.encoders.RowEncoder(mapped3.schema)) val mapped7 = Range(1, 100).foldLeft(analyzedMapped4) { (df, i) => df.withColumn(s"value${i}_4", df("value") + i) } val s9 = System.nanoTime() val secondsToNanos = 1000*1000*1000.0 val stage1 = (s2-s1)/secondsToNanos val stage2 = (s3-s2)/secondsToNanos val stage3 = (s4-s3)/secondsToNanos val stage4 = (s5-s4)/secondsToNanos val stage5 = (s6-s5)/secondsToNanos val stage6 = (s7-s6)/secondsToNanos val stage7 = (s8-s7)/secondsToNanos val stage8 = (s9-s8)/secondsToNanos println(s"First 100: ${stage1}") println(s"Second 100: ${stage2}") println(s"Third 100: ${stage3}") println(s"Fourth 100: ${stage4}") println(s"Fourth 100 Optimization time: ${stage5}") println(s"Fourth 100 Optimized ${stage6}") println(s"Fourth Unoptimized (to make sure no caching/etc takes place, reusing analyzed etc: ${stage7}") println(s"Fourth selects: ${stage8}")
This results in the following performance:
First 100: 4.873489454 Second 100: 14.982028303 seconds Third 100: 38.775467952 seconds Fourth 100: 73.429119675 seconds Fourth 100 Optimization time: 1.777374175 seconds Fourth 100 Optimized 22.514489934 seconds Fourth Unoptimized (to make sure no caching/etc takes place, reusing analyzed etc: 69.616112734 seconds Fourth 100 using analyzed plan: 67.641982709 seconds
Now, I suspect that we can't just sub in the optimized plan for the logical plan because we lose a bunch of information which may be useful for optimization later. But, I suspect there's something we can do in the case of Projects at least that might be useful.
Attachments
Issue Links
- is related to
-
SPARK-23223 Stacking dataset transforms performs poorly
- Resolved