The references in the heap dump show that the POUserFunc a plan (except the oldest one?) has reference to Reducer$Context ( POUserFunc -> udf object -> ProgressableReporter -> Reducer$Context). But the Reducer$Context object has reference to PigCombiner$Combine which has reference to another (previously created?) PhysicalPlan. So any Combiner PhysicalPlan instance that has been created in the map task has a reference to it and can't be freed by GC .
I haven't followed the exact call sequence that leads to it, but it looks like a PhysicalPlan instance is created with reference to a copy of the previous Reducer$Context, and since this is a inner class of PigCombiner$Combine (a subclass of Reducer) it has a reference (this$0) to it. And this older PigCombiner$Combine has references to the old physical plan. The old physical plan has references to the older Reducer$Context and so on.
To break this chain, in this patch I clean the references to the PhysicalPlan in PigCombiner$Combine when the cleanup method is called.
I had a look at the hadoop mapreduce code that does the sort-and-spill of map output (org.apache.hadoop.mapred$MapOutputBuffer$SpillThread.sortAndSpill() ), and it looks like one combiner class instance is created for every partition (ie reducer).
In case of the query whose map tasks ran out of memory, mapred.reduce.tasks was set to 300, ie 300 instances of combiner class , and therefore 300 instances of physical plan will be created for every spill. The query in 0.8 also had several spills ( 10+) , which means that there will be more than 3000 instances of PhysicalPlan lying around. The Physical plans in this case were also large because it was a 'multi-query' , and 17 MR jobs were merged into 1.
ie, The failure can happen in any query which uses combiner. There just needs to be large number of instances of physical plan , and number of physical plan instances = number-of-reducers * number-of-spills. If the PhysicalPlan is large, you need fewer instances of it for failure.