Uploaded image for project: 'Pig'
  1. Pig
  2. PIG-4166

Collected group drops last record when combined with merge join

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 0.12.0
    • 0.14.0
    • None
    • None
    • Patch Available
    • Reviewed

    Description

      If the final two keys in each relation join, they will never make it to the final output. The reason is that POMergeJoin does a read-ahead and POCollectedGroup doesn't call processInput when this.parentPlan.endOfAllInput == true. This prevents the final join from being output because POMergeJoin never sees endOfAllInput == true.

      diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
      index c355d1d..8fd44fa 100644
      --- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
      +++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
      @@ -127,28 +127,30 @@ public class POCollectedGroup extends PhysicalOperator {
           @Override
           public Result getNextTuple() throws ExecException {
       
      -        // Since the output is buffered, we need to flush the last
      -        // set of records when the close method is called by mapper.
      -        if (this.parentPlan.endOfAllInput) {
      -            if (outputBag != null) {
      -                Tuple tup = mTupleFactory.newTuple(2);
      -                tup.set(0, prevKey);
      -                tup.set(1, outputBag);
      -                outputBag = null;
      -                return new Result(POStatus.STATUS_OK, tup);
      -            }
      -
      -            return new Result(POStatus.STATUS_EOP, null);
      -        }
      +        
       
               Result inp = null;
               Result res = null;
       
               while (true) {
                   inp = processInput();
      +
                   if (inp.returnStatus == POStatus.STATUS_EOP ||
                           inp.returnStatus == POStatus.STATUS_ERR) {
      -                break;
      +               // Since the output is buffered, we need to flush the last
      +                // set of records when the close method is called by mapper.
      +                if (this.parentPlan.endOfAllInput) {
      +                    if (outputBag != null) {
      +                        Tuple tup = mTupleFactory.newTuple(2);
      +                        tup.set(0, prevKey);
      +                        tup.set(1, outputBag);
      +                        outputBag = null;
      +                        return new Result(POStatus.STATUS_OK, tup);
      +                    }
      +
      +                    return new Result(POStatus.STATUS_EOP, null);
      +                } else
      +                       break;
                   }
       
                   if (inp.returnStatus == POStatus.STATUS_NULL) {
      

      Attachments

        1. patch
          5 kB
          Brian Johnson
        2. PIG-4166-2.patch
          4 kB
          Daniel Dai

        Activity

          People

            bridiver Brian Johnson
            bridiver Brian Johnson
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: