Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
0.12.0
-
None
-
None
-
Patch Available
-
Reviewed
Description
If the final two keys in each relation join, they will never make it to the final output. The reason is that POMergeJoin does a read-ahead and POCollectedGroup doesn't call processInput when this.parentPlan.endOfAllInput == true. This prevents the final join from being output because POMergeJoin never sees endOfAllInput == true.
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java index c355d1d..8fd44fa 100644 --- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java +++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java @@ -127,28 +127,30 @@ public class POCollectedGroup extends PhysicalOperator { @Override public Result getNextTuple() throws ExecException { - // Since the output is buffered, we need to flush the last - // set of records when the close method is called by mapper. - if (this.parentPlan.endOfAllInput) { - if (outputBag != null) { - Tuple tup = mTupleFactory.newTuple(2); - tup.set(0, prevKey); - tup.set(1, outputBag); - outputBag = null; - return new Result(POStatus.STATUS_OK, tup); - } - - return new Result(POStatus.STATUS_EOP, null); - } + Result inp = null; Result res = null; while (true) { inp = processInput(); + if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR) { - break; + // Since the output is buffered, we need to flush the last + // set of records when the close method is called by mapper. + if (this.parentPlan.endOfAllInput) { + if (outputBag != null) { + Tuple tup = mTupleFactory.newTuple(2); + tup.set(0, prevKey); + tup.set(1, outputBag); + outputBag = null; + return new Result(POStatus.STATUS_OK, tup); + } + + return new Result(POStatus.STATUS_EOP, null); + } else + break; } if (inp.returnStatus == POStatus.STATUS_NULL) {