Uploaded image for project: 'Pig'
  1. Pig
  2. PIG-2462

getWrappedSplit is incorrectly returning the first split instead of the current split.

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.9.1, 0.11
    • Fix Version/s: 0.9.2, 0.10.0, 0.11
    • Component/s: None
    • Labels:
      None
    • Patch Info:
      Patch Available
    • Hadoop Flags:
      Reviewed

      Description

      If your loader needs information regarding what file is currently is being read (lets say for schema information), currently provides this ability by calling prepareToRead every time we read a new split. This is critical for ComibinedInputFormat as each mapper can read more then one file. In order for the load function to know what file we are currently reading, it should call getWrappedSplit() to get that information. How ever, getWrappedSplit always returns the first split in the list. Code from PigSplit.java:

      /**

      • This methods returns the actual InputSplit (as returned by the
      • {@link InputFormat}

        ) which this class is wrapping.

      • @return the wrappedSplit
        */
        public InputSplit getWrappedSplit() { return wrappedSplits[0]; }

      Furthermore, in PigRecordReader.java the splitIndex is never incremented when changing from split to split. So in fact, even if getWrappedSplit() wold be changed to return wrappedSplits[splitIndex]; it would still return the incorrect index.

      This can be fixed by changing PigRecordReader to increment PigSplit.splitIndex everytime the split chagnes in the following code:

      /**

      • Get the record reader for the next chunk in this CombineFileSplit.
        */
        protected boolean initNextRecordReader() throws IOException, InterruptedException {

      if (curReader != null) {
      curReader.close();
      curReader = null;
      if (idx > 0)

      { progress += pigSplit.getLength(idx-1); // done processing so far }

      }

      // if all chunks have been processed, nothing more to do.
      if (idx == pigSplit.getNumPaths())

      { return false; }

      // get a record reader for the idx-th chunk
      try {

      curReader = inputformat.createRecordReader(pigSplit.getWrappedSplit(idx), context);
      LOG.info("Current split being processed "+pigSplit.getWrappedSplit(idx));

      if (idx > 0)

      { // initialize() for the first RecordReader will be called by MapTask; // we're responsible for initializing subsequent RecordReaders. curReader.initialize(pigSplit.getWrappedSplit(idx), context); pigSplit.get loadfunc.prepareToRead(curReader, pigSplit); }

      } catch (Exception e)

      { throw new RuntimeException (e); }

      idx++;
      return true;
      }
      }

        Attachments

        1. splitsfix.patch
          3 kB
          Alex Rovner
        2. split_fix_take2.patch
          3 kB
          Alex Rovner
        3. PIG-2462-2.patch
          7 kB
          Daniel Dai
        4. PIG-2462-2_0.9.patch
          6 kB
          Daniel Dai

          Issue Links

            Activity

              People

              • Assignee:
                arov Alex Rovner
                Reporter:
                arov Alex Rovner
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: