Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
None
-
None
Description
The reason is that: when we call RDD API inside SparkPlan, we are very likely to reference the SparkPlan in the closure and thus serialize and transfer a SparkPlan tree to executor side. When we deserialize it, the accumulators in child SparkPlan are also deserialized and registered, and always report zero value.
This is not a problem currently because we only have one operation to aggregate the accumulators: add. However, if we wanna support more complex metric like min, the extra zero values will lead to wrong result.
Take TungstenAggregate as an example, I logged "stageId, partitionId, accumName, accumId" when an accumulator is deserialized and registered, and logged the "accumId -> accumValue" map when a task ends. The output is:
scala> val df = Seq(1 -> "a", 2 -> "b").toDF("a", "b").groupBy().count() df: org.apache.spark.sql.DataFrame = [count: bigint] scala> df.collect register: 0 0 Some(number of input rows) 4 register: 0 0 Some(number of output rows) 5 register: 1 0 Some(number of input rows) 4 register: 1 0 Some(number of output rows) 5 register: 1 0 Some(number of input rows) 2 register: 1 0 Some(number of output rows) 3 Map(5 -> 1, 4 -> 2, 6 -> 4458496) Map(5 -> 0, 2 -> 1, 7 -> 4458496, 3 -> 1, 4 -> 0) res0: Array[org.apache.spark.sql.Row] = Array([2])
The best choice is to avoid serialize and deserialize a SparkPlan tree, which can be achieved by LocalNode.
Or we can do some workaround to fix this serialization problem for the problematic SparkPlans like TungstenAggregate, TungstenSort.
Or we can improve the SQL metrics framework to make it more robust to this case.