Pig
  1. Pig
  2. PIG-2462

getWrappedSplit is incorrectly returning the first split instead of the current split.

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.9.1, 0.11
    • Fix Version/s: 0.9.2, 0.10.0, 0.11
    • Component/s: None
    • Labels:
      None
    • Patch Info:
      Patch Available
    • Hadoop Flags:
      Reviewed

      Description

      If your loader needs information regarding what file is currently is being read (lets say for schema information), currently provides this ability by calling prepareToRead every time we read a new split. This is critical for ComibinedInputFormat as each mapper can read more then one file. In order for the load function to know what file we are currently reading, it should call getWrappedSplit() to get that information. How ever, getWrappedSplit always returns the first split in the list. Code from PigSplit.java:

      /**

      • This methods returns the actual InputSplit (as returned by the
      • {@link InputFormat}

        ) which this class is wrapping.

      • @return the wrappedSplit
        */
        public InputSplit getWrappedSplit() { return wrappedSplits[0]; }

      Furthermore, in PigRecordReader.java the splitIndex is never incremented when changing from split to split. So in fact, even if getWrappedSplit() wold be changed to return wrappedSplits[splitIndex]; it would still return the incorrect index.

      This can be fixed by changing PigRecordReader to increment PigSplit.splitIndex everytime the split chagnes in the following code:

      /**

      • Get the record reader for the next chunk in this CombineFileSplit.
        */
        protected boolean initNextRecordReader() throws IOException, InterruptedException {

      if (curReader != null) {
      curReader.close();
      curReader = null;
      if (idx > 0)

      { progress += pigSplit.getLength(idx-1); // done processing so far }

      }

      // if all chunks have been processed, nothing more to do.
      if (idx == pigSplit.getNumPaths())

      { return false; }

      // get a record reader for the idx-th chunk
      try {

      curReader = inputformat.createRecordReader(pigSplit.getWrappedSplit(idx), context);
      LOG.info("Current split being processed "+pigSplit.getWrappedSplit(idx));

      if (idx > 0)

      { // initialize() for the first RecordReader will be called by MapTask; // we're responsible for initializing subsequent RecordReaders. curReader.initialize(pigSplit.getWrappedSplit(idx), context); pigSplit.get loadfunc.prepareToRead(curReader, pigSplit); }

      } catch (Exception e)

      { throw new RuntimeException (e); }

      idx++;
      return true;
      }
      }

      1. splitsfix.patch
        3 kB
        Alex Rovner
      2. split_fix_take2.patch
        3 kB
        Alex Rovner
      3. PIG-2462-2.patch
        7 kB
        Daniel Dai
      4. PIG-2462-2_0.9.patch
        6 kB
        Daniel Dai

        Issue Links

          Activity

          Hide
          Daniel Dai added a comment -

          Yes, however, we don't have plan for another 0.8 release. Can you apply the patch to 0.8 branch and build by yourself?

          Show
          Daniel Dai added a comment - Yes, however, we don't have plan for another 0.8 release. Can you apply the patch to 0.8 branch and build by yourself?
          Hide
          Yulia Tolskaya added a comment -

          This bug also exist in pig 0.8

          Show
          Yulia Tolskaya added a comment - This bug also exist in pig 0.8
          Hide
          Daniel Dai added a comment -

          +1 for patch.

          Unit test pass.

          test-patch:
          [exec] -1 overall.
          [exec]
          [exec] +1 @author. The patch does not contain any @author tags.
          [exec]
          [exec] +1 tests included. The patch appears to include 3 new or modified tests.
          [exec]
          [exec] +1 javadoc. The javadoc tool did not generate any warning messages.
          [exec]
          [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings.
          [exec]
          [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings.
          [exec]
          [exec] -1 release audit. The applied patch generated 510 release audit warnings (more than the trunk's current 502 warnings).

          All new file has Apache header, ignore release audit warning.

          Patch committed to 0.9/0.10/trunk.

          Thanks Alex!

          Show
          Daniel Dai added a comment - +1 for patch. Unit test pass. test-patch: [exec] -1 overall. [exec] [exec] +1 @author. The patch does not contain any @author tags. [exec] [exec] +1 tests included. The patch appears to include 3 new or modified tests. [exec] [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings. [exec] [exec] -1 release audit. The applied patch generated 510 release audit warnings (more than the trunk's current 502 warnings). All new file has Apache header, ignore release audit warning. Patch committed to 0.9/0.10/trunk. Thanks Alex!
          Hide
          Daniel Dai added a comment -

          PIG-2462-2_0.9.patch is the same patch for 0.9 branch.

          Show
          Daniel Dai added a comment - PIG-2462 -2_0.9.patch is the same patch for 0.9 branch.
          Hide
          Daniel Dai added a comment -

          I may think too much. We don't need InputFormat in the test. We only need a LoadFunc. I attached patch with test case.

          Show
          Daniel Dai added a comment - I may think too much. We don't need InputFormat in the test. We only need a LoadFunc. I attached patch with test case.
          Hide
          Daniel Dai added a comment -

          You will need to write an inputformat, a loadfunc, and use PigUnit to invoke this loadfunc. Unfortunately I cannot find a sample with a custom inputformat in existing tests.

          Show
          Daniel Dai added a comment - You will need to write an inputformat, a loadfunc, and use PigUnit to invoke this loadfunc. Unfortunately I cannot find a sample with a custom inputformat in existing tests.
          Hide
          Alex Rovner added a comment -

          Is it possible to use CombinedInputFormat in PigUnit?
          Any existing test you can point me to as an example?

          Show
          Alex Rovner added a comment - Is it possible to use CombinedInputFormat in PigUnit? Any existing test you can point me to as an example?
          Hide
          Daniel Dai added a comment -

          Patch looks good. Test is a little complex, but is possible. We need to add a testcase.

          Show
          Daniel Dai added a comment - Patch looks good. Test is a little complex, but is possible. We need to add a testcase.
          Hide
          Alex Rovner added a comment -

          Attached the changes based on the comments

          Show
          Alex Rovner added a comment - Attached the changes based on the comments
          Hide
          Daniel Dai added a comment -

          splitIndex within the PigInputFormat tracks the current PigSplit correct?

          Yes

          What does splitIndex within the PigSplit track? (From my understanding it should track the current wrapped InputSplit)

          It is the way PigSplit identify itself

          There is also inputIndex within PigSplit. Wouldn't that track the InputSplit index?

          If a mapreduce job need more than 1 input (eg, join a, b, we have two input a & b in the same map), inputIndex tracks which input is it

          Show
          Daniel Dai added a comment - splitIndex within the PigInputFormat tracks the current PigSplit correct? Yes What does splitIndex within the PigSplit track? (From my understanding it should track the current wrapped InputSplit) It is the way PigSplit identify itself There is also inputIndex within PigSplit. Wouldn't that track the InputSplit index? If a mapreduce job need more than 1 input (eg, join a, b, we have two input a & b in the same map), inputIndex tracks which input is it
          Hide
          Alex Rovner added a comment -

          Thanks Daniel for the info.

          Some questions:
          splitIndex within the PigInputFormat tracks the current PigSplit correct?
          What does splitIndex within the PigSplit track? (From my understanding it should track the current wrapped InputSplit)
          There is also inputIndex within PigSplit. Wouldn't that track the InputSplit index?

          Finally, do we need to introduce an "idx" in PigSplit or my patch would suffice?

          Show
          Alex Rovner added a comment - Thanks Daniel for the info. Some questions: splitIndex within the PigInputFormat tracks the current PigSplit correct? What does splitIndex within the PigSplit track? (From my understanding it should track the current wrapped InputSplit) There is also inputIndex within PigSplit. Wouldn't that track the InputSplit index? Finally, do we need to introduce an "idx" in PigSplit or my patch would suffice?
          Hide
          Daniel Dai added a comment -

          Hi, Alex,
          We have two level of split, PigSplit and InputSplit, PigSplit is a wrap of several InputSplit. In PigInputFormat, we combine multiple InputSplit into one PigSplit. splitIndex track current PigSplit, idx track current InputSplit within PigSplit.

          Show
          Daniel Dai added a comment - Hi, Alex, We have two level of split, PigSplit and InputSplit, PigSplit is a wrap of several InputSplit. In PigInputFormat, we combine multiple InputSplit into one PigSplit. splitIndex track current PigSplit, idx track current InputSplit within PigSplit.
          Hide
          Alex Rovner added a comment -

          One way to avoid this issue is to disable combinedinputformat in your pigs jobs.

          I guess I am a bit confused about the comments on the splitIndex as I am not very familiar with PIG's code base. Is split index used elsewhere and is not really meant to track the index of the current pigsplit that we are reading? If so, I can certainly change the patch to include another variable "idx" as suggested to keep track of this value.

          How ever judging from the PigInputFormat.getPigSplits code:
          for (int i = 0; i < combinedSplits.size(); i++)
          pigSplits.add(createPigSplit(combinedSplits.get, inputIndex, targetOps, i, conf));

          Seems like the intention was to use splitIndex to track the current split?

          Show
          Alex Rovner added a comment - One way to avoid this issue is to disable combinedinputformat in your pigs jobs. I guess I am a bit confused about the comments on the splitIndex as I am not very familiar with PIG's code base. Is split index used elsewhere and is not really meant to track the index of the current pigsplit that we are reading? If so, I can certainly change the patch to include another variable "idx" as suggested to keep track of this value. How ever judging from the PigInputFormat.getPigSplits code: for (int i = 0; i < combinedSplits.size(); i++) pigSplits.add(createPigSplit(combinedSplits.get , inputIndex, targetOps, i, conf)); Seems like the intention was to use splitIndex to track the current split?
          Hide
          Aniket Mokashi added a comment -

          Yes. I am just wondering if there is a way to workaround this (to avoid porting it back in pig). I think pig-user list might be a better place to discuss it. Thanks for your comments.

          Show
          Aniket Mokashi added a comment - Yes. I am just wondering if there is a way to workaround this (to avoid porting it back in pig). I think pig-user list might be a better place to discuss it. Thanks for your comments.
          Hide
          Daniel Dai added a comment -

          Correct me if wrong, but I thought this is exactly the issue we want to solve. LoadFunc will be passed PigSplit in prepareToRead, and we want user call pigSplit.getWrappedSplit() to get split specific information. The problem in current Pig is pigSplit.getWrappedSplit() always get #0 split. So we have this Jira to fix it.

          Show
          Daniel Dai added a comment - Correct me if wrong, but I thought this is exactly the issue we want to solve. LoadFunc will be passed PigSplit in prepareToRead, and we want user call pigSplit.getWrappedSplit() to get split specific information. The problem in current Pig is pigSplit.getWrappedSplit() always get #0 split. So we have this Jira to fix it.
          Hide
          Aniket Mokashi added a comment -

          Hi Daniel,
          Is there a way to work around this issue elegantly? Basically, information on split needs to be available to loadfunc. I can think of getting this at createRecordReader level on the inputformat returned by getInputFormat. But, how do I pass it down elegantly to the loadfunc from there.
          Can you suggest an idea?
          Thanks,
          Aniket

          Show
          Aniket Mokashi added a comment - Hi Daniel, Is there a way to work around this issue elegantly? Basically, information on split needs to be available to loadfunc. I can think of getting this at createRecordReader level on the inputformat returned by getInputFormat. But, how do I pass it down elegantly to the loadfunc from there. Can you suggest an idea? Thanks, Aniket
          Hide
          Daniel Dai added a comment -

          Thanks Aniket, I just realize the patch try to reuse splitIndex. We shall add a new variable idx to PigSplit, it is set by PigRecordReader.initNextRecordReader and consumed by PigSplit.getWrappedSplit, to track the current InputSplit. It has nothing to do with PigSplit.splitIndex.

          Show
          Daniel Dai added a comment - Thanks Aniket, I just realize the patch try to reuse splitIndex. We shall add a new variable idx to PigSplit, it is set by PigRecordReader.initNextRecordReader and consumed by PigSplit.getWrappedSplit, to track the current InputSplit. It has nothing to do with PigSplit.splitIndex.
          Hide
          Aniket Mokashi added a comment -

          splitIndex's definition is getting changed here. (its not newly added).
          splitIndex is used for keeping track of Pigsplit itself. While reading records with pigrecordreader, should we really change this index?

          From MergeJoinIndexer code, consider a case where one pigsplit is associate with one wrappedsplit and we have a couple of pigsplit. Before we do wrapperTuple.set(keysCnt+1, pigSplit.getSplitIndex()); on line 179, we do loader.getnext(). I am not very sure about this, but this might down the stack hit PigRecordReader.initNextRecordReader that will reset the splitIndex on pigsplit to 0 for every pigsplit.

          It would be safer to keep the idx on pigsplit as a separate variable and copy down from PigRecordReader as we need.

          Thoughts?

          Show
          Aniket Mokashi added a comment - splitIndex's definition is getting changed here. (its not newly added). splitIndex is used for keeping track of Pigsplit itself. While reading records with pigrecordreader, should we really change this index? From MergeJoinIndexer code, consider a case where one pigsplit is associate with one wrappedsplit and we have a couple of pigsplit. Before we do wrapperTuple.set(keysCnt+1, pigSplit.getSplitIndex()); on line 179, we do loader.getnext(). I am not very sure about this, but this might down the stack hit PigRecordReader.initNextRecordReader that will reset the splitIndex on pigsplit to 0 for every pigsplit. It would be safer to keep the idx on pigsplit as a separate variable and copy down from PigRecordReader as we need. Thoughts?
          Hide
          Prashant Kommireddi added a comment -

          I did not find anything wrong either by looking at the code, just making sure.

          Show
          Prashant Kommireddi added a comment - I did not find anything wrong either by looking at the code, just making sure.
          Hide
          Daniel Dai added a comment -

          We need to test. But I feel get the right split instead 0 should be the right way. Just browse through the code, I didn't find anything wrong with the change.

          Show
          Daniel Dai added a comment - We need to test. But I feel get the right split instead 0 should be the right way. Just browse through the code, I didn't find anything wrong with the change.
          Hide
          Prashant Kommireddi added a comment -

          How would this change affect MergeJoinIndexer.java which uses pigSplit.getWrappedSplit() ?

          Show
          Prashant Kommireddi added a comment - How would this change affect MergeJoinIndexer.java which uses pigSplit.getWrappedSplit() ?
          Hide
          Daniel Dai added a comment -

          Yes, you are right. That's why you pass index from PigRecordReader to PigSplit. Your approach looks right. Except for the name "splitIndex", we usually refer it to the index of PigSplit itself, not the index of InputSplit inside PigSplit. It's better to use "idx" to make it less confused.

          Show
          Daniel Dai added a comment - Yes, you are right. That's why you pass index from PigRecordReader to PigSplit. Your approach looks right. Except for the name "splitIndex", we usually refer it to the index of PigSplit itself, not the index of InputSplit inside PigSplit. It's better to use "idx" to make it less confused.
          Hide
          Alex Rovner added a comment -

          Daniel – there is no member idx in PigSplit.java. The index thats supposed to be tracked is splitIndex. Furthermore with combined input format, PigRecordReader does not increment the value of splitIndex when switching the reading from one split to the next even though it does increment and uses this index internally. Therefore if we just change wrappedSplits[0] to wrappedSplits[splitIndex] you will still have this issue. I have verified that splitIndex is not modified anywhere except through the constructor in PigSplit.java.

          I have made the needed code changes and have verified them with my loader. Now my log messages from my loader correspond to the log messages from PigRecordReader (originally that was not the case).

          Show
          Alex Rovner added a comment - Daniel – there is no member idx in PigSplit.java. The index thats supposed to be tracked is splitIndex. Furthermore with combined input format, PigRecordReader does not increment the value of splitIndex when switching the reading from one split to the next even though it does increment and uses this index internally. Therefore if we just change wrappedSplits [0] to wrappedSplits [splitIndex] you will still have this issue. I have verified that splitIndex is not modified anywhere except through the constructor in PigSplit.java. I have made the needed code changes and have verified them with my loader. Now my log messages from my loader correspond to the log messages from PigRecordReader (originally that was not the case).
          Hide
          Alex Rovner added a comment -

          Attaching patch generated by git format-patch. I couldn't verify all unit tests since there is currently an issue with them in trunk. Most of the unit tests passed and I have verified this patch with my loader which had the mentioned issue.

          Show
          Alex Rovner added a comment - Attaching patch generated by git format-patch. I couldn't verify all unit tests since there is currently an issue with them in trunk. Most of the unit tests passed and I have verified this patch with my loader which had the mentioned issue.
          Hide
          Daniel Dai added a comment -

          splitIndex is the index of PigSplit, idx keeps track of current InputSplit within PigSplit. I feel changing wrappedSplits[0] into wrappedSplits[idx] should be enough.

          Show
          Daniel Dai added a comment - splitIndex is the index of PigSplit, idx keeps track of current InputSplit within PigSplit. I feel changing wrappedSplits [0] into wrappedSplits [idx] should be enough.
          Hide
          Alex Rovner added a comment -

          I am attempting to make a patch. Proposed fixes:

          Add pigSplit.setSplitIndex(idx) before curReader.initialize(pigSplit.getWrappedSplit(idx), context);

          change return wrappedSplits[0]; to return wrappedSplits[splitIndex]; in PigSplit.getWrappedSplit();

          Show
          Alex Rovner added a comment - I am attempting to make a patch. Proposed fixes: Add pigSplit.setSplitIndex(idx) before curReader.initialize(pigSplit.getWrappedSplit(idx), context); change return wrappedSplits [0] ; to return wrappedSplits [splitIndex] ; in PigSplit.getWrappedSplit();

            People

            • Assignee:
              Alex Rovner
              Reporter:
              Alex Rovner
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development