Details
Description
I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x failing with
java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy
It happens while serializing an accumulator here
val copyAcc = copyAndReset() assert(copyAcc.isZero, "copyAndReset must return a zero value copy")
... although copyAndReset returns zero-value copy for sure, just consider the accumulator below
val concatParam = new AccumulatorParam[jl.StringBuilder] { override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new jl.StringBuilder() override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): jl.StringBuilder = r1.append(r2) }
So, Spark treats zero value as non-zero due to how isZero is implemented in LegacyAccumulatorWrapper.
override def isZero: Boolean = _value == param.zero(initialValue)
All this means that the values to be accumulated must implement equals and hashCode, otherwise isZero is very likely to always return false.
So I'm wondering whether the assertion
assert(copyAcc.isZero, "copyAndReset must return a zero value copy")
is really necessary and whether it can be safely removed from there?
If not - is it ok to just override writeReplace for LegacyAccumulatorWrapper to prevent such failures?