Uploaded image for project: 'Pig'
  1. Pig
  2. PIG-5359

Reduce time spent in split serialization

VotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 0.18.0
    • None
    • None
    • Reviewed

    Description

      1. Unnecessary serialization of splits in Tez.
      In LoaderProcessor, pig calls
      https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java#L172

      tezOp.getLoaderInfo().setInputSplitInfo(MRInputHelpers.generateInputSplitsToMem(conf, false, 0));
      

      It ends up serializing the splits, just to print log.

      https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java#L317

        public static InputSplitInfoMem generateInputSplitsToMem(Configuration conf,
            boolean groupSplits, boolean sortSplits, int targetTasks)
            throws IOException, ClassNotFoundException, InterruptedException {
            ....
            ....
                LOG.info("NumSplits: " + splitInfoMem.getNumTasks() + ", SerializedSize: "
              + splitInfoMem.getSplitsProto().getSerializedSize());
          return splitInfoMem;
      
      

      https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java#L106

        public MRSplitsProto getSplitsProto() {
          if (isNewSplit) {
            try {
              return createSplitsProto(newFormatSplits, new SerializationFactory(conf));
      
      

      https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java#L152-L170

        private static MRSplitsProto createSplitsProto(
            org.apache.hadoop.mapreduce.InputSplit[] newSplits,
            SerializationFactory serializationFactory) throws IOException,
            InterruptedException {
          MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
      
          for (org.apache.hadoop.mapreduce.InputSplit newSplit : newSplits) {
            splitsBuilder.addSplits(MRInputHelpers.createSplitProto(newSplit, serializationFactory));
          }
          return splitsBuilder.build();
        }
      
      

      https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java#L221-L259

      2. In TezDagBuilder, if splitsSerializedSize > spillThreshold, then the InputSplits serialized in MRSplitsProto are not used by Pig and it serializes again directly to disk via JobSplitWriter.createSplitFiles. So the InputSplit serialization logic is called again which is wasteful and expensive in cases like HCat.

      https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java#L946-L947

      MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto();
      int splitsSerializedSize = splitsProto.getSerializedSize();
      

      The getSplitsProto, creates MRSplitsProto which consists of list of MRSplitProto. MRSplitProto has serialized bytes of each InputFormat. If splitsSerializedSize > spillThreshold, pig writes the splits to disk via

      if(splitsSerializedSize > spillThreshold) {
          inputPayLoad.setBoolean(
                  org.apache.tez.mapreduce.hadoop.MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
                  false);
          // Write splits to disk
          Path inputSplitsDir = FileLocalizer.getTemporaryPath(pc);
          log.info("Writing input splits to " + inputSplitsDir
                  + " for vertex " + vertex.getName()
                  + " as the serialized size in memory is "
                  + splitsSerializedSize + ". Configured "
                  + PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD
                  + " is " + spillThreshold);
          inputSplitInfo = MRToTezHelper.writeInputSplitInfoToDisk(
                  (InputSplitInfoMem)inputSplitInfo, inputSplitsDir, payloadConf, fs);
      
      

      https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java#L960
      https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java#L302-L314

      Solution:
      1. Do not serialize the split in LoaderProcessor.java
      2. In TezDagBuilder.java, serialize each input split and keep adding its size and if it exceeds spillThreshold, then write the splits to disk reusing the serialized buffers for each split.

       

      Thank you Rohini Palaniswamy for identifying the issue.

      Attachments

        1. PIG-5359-amend-2.patch
          3 kB
          Satish Saley
        2. PIG-5359-amend-1.patch
          1 kB
          Satish Saley
        3. PIG-5359-3.patch
          28 kB
          Rohini Palaniswamy

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            satishsaley Satish Saley
            satishsaley Satish Saley
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment