Description
In ML pipelines, each transformer/estimator appends new columns to the input DataFrame. For example, it might produce DataFrames like the following columns: a, b, c, d, where a is from raw input, b = udf_b(a), c = udf_c(b), and d = udf_d(c). Some UDFs could be expensive. However, if we materialize c and d, udf_b, and udf_c are triggered twice, i.e., value c is not re-used.
It would be nice to detect this pattern and re-use intermediate values.
val input = sqlContext.range(10) val output = input.withColumn("x", col("id") + 1).withColumn("y", col("x") * 2) output.explain(true) == Parsed Logical Plan == 'Project [*,('x * 2) AS y#254] Project [id#252L,(id#252L + cast(1 as bigint)) AS x#253L] LogicalRDD [id#252L], MapPartitionsRDD[458] at range at <console>:30 == Analyzed Logical Plan == id: bigint, x: bigint, y: bigint Project [id#252L,x#253L,(x#253L * cast(2 as bigint)) AS y#254L] Project [id#252L,(id#252L + cast(1 as bigint)) AS x#253L] LogicalRDD [id#252L], MapPartitionsRDD[458] at range at <console>:30 == Optimized Logical Plan == Project [id#252L,(id#252L + 1) AS x#253L,((id#252L + 1) * 2) AS y#254L] LogicalRDD [id#252L], MapPartitionsRDD[458] at range at <console>:30 == Physical Plan == TungstenProject [id#252L,(id#252L + 1) AS x#253L,((id#252L + 1) * 2) AS y#254L] Scan PhysicalRDD[id#252L] Code Generation: true input: org.apache.spark.sql.DataFrame = [id: bigint] output: org.apache.spark.sql.DataFrame = [id: bigint, x: bigint, y: bigint]
Attachments
Issue Links
- is duplicated by
-
SPARK-11990 DataFrame recompute UDF in some situation.
- Resolved
- links to