For groupByKey and join transformations, Spark tasks on the reduce side deserialize every record into a Java object before calling any user function.
This causes all kinds of problems for garbage collection - when aggregating enough data, objects can escape the young gen and trigger full GCs down the line. Additionally, when records are spilled, they must be serialized and deserialized multiple times.
It would be helpful to allow aggregations on serialized data - using some sort of RawHasher interface that could implement hashCode and equals for serialized records. This would also require encoding record boundaries in the serialized format, which I'm not sure we currently do.