Uploaded image for project: 'Hadoop Common'
  1. Hadoop Common
  2. HADOOP-20

Mapper, Reducer need an occasion to cleanup after the last record is processed.

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.1.0
    • Component/s: None
    • Labels:
      None
    • Environment:

      Linux

      Description

      Mapper, Reducer need an occasion to do some cleanup after the last record is processed.
      Proposal (patch attached)
      in interface Mapper:
      add method void finished();
      in interface Reducer:
      add method void finished();

      finished() methods are called from MapTask, CombiningCollector, ReduceTask.
      ------------
      Known limitation: Fetcher (a multithreaded MapRunnable) does not call finished().
      This is not currently a problem bec. fetcher Map/Reduce modules do not do anything in finished().
      The right way to add finished() support to Fetcher would be to wait for all threads to finish,
      then do:
      if (collector instanceof CombiningCollector) ((CombiningCollector)collector).finished();
      ------------
      patch begins: (svn trunk)

      Index: src/test/org/apache/nutch/mapred/MapredLoadTest.java
      ===================================================================
      — src/test/org/apache/nutch/mapred/MapredLoadTest.java (revision 374781)
      +++ src/test/org/apache/nutch/mapred/MapredLoadTest.java (working copy)
      @@ -69,6 +69,8 @@
      out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal));
      }
      }
      + public void finished()

      { + }
      }
      static class RandomGenReducer implements Reducer {
      public void configure(JobConf job) { @@ -81,6 +83,8 @@ out.collect(new UTF8("" + val), new UTF8("")); }
      }
      + public void finished() {+ }

      }
      static class RandomCheckMapper implements Mapper {
      public void configure(JobConf job)

      { @@ -92,6 +96,8 @@ out.collect(new IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1)); }

      + public void finished()

      { + }
      }
      static class RandomCheckReducer implements Reducer {
      public void configure(JobConf job) { @@ -106,6 +112,8 @@ }
      out.collect(new IntWritable(keyint), new IntWritable(count));
      }
      + public void finished() {+ }

      }

      int range;
      Index: src/test/org/apache/nutch/fs/TestNutchFileSystem.java
      ===================================================================
      — src/test/org/apache/nutch/fs/TestNutchFileSystem.java (revision 374783)
      +++ src/test/org/apache/nutch/fs/TestNutchFileSystem.java (working copy)
      @@ -155,6 +155,8 @@

      reporter.setStatus("wrote " + name);
      }
      +
      + public void finished() {}
      }

      public static void writeTest(NutchFileSystem fs, boolean fastCheck)
      @@ -247,6 +249,9 @@

      reporter.setStatus("read " + name);
      }
      +
      + public void finished() {}
      +
      }

      public static void readTest(NutchFileSystem fs, boolean fastCheck)
      @@ -339,6 +344,9 @@
      in.close();
      }
      }
      +
      + public void finished() {}
      +
      }

      public static void seekTest(NutchFileSystem fs, boolean fastCheck)
      Index: src/java/org/apache/nutch/indexer/DeleteDuplicates.java
      ===================================================================
      — src/java/org/apache/nutch/indexer/DeleteDuplicates.java (revision 374776)
      +++ src/java/org/apache/nutch/indexer/DeleteDuplicates.java (working copy)
      @@ -225,6 +225,7 @@
      }
      }
      }
      + public void finished() {}
      }

      private NutchFileSystem fs;
      @@ -265,6 +266,8 @@
      reader.close();
      }
      }
      +
      + public void finished() {}

      /** Write nothing. */
      public RecordWriter getRecordWriter(final NutchFileSystem fs,
      Index: src/java/org/apache/nutch/indexer/Indexer.java
      ===================================================================
      — src/java/org/apache/nutch/indexer/Indexer.java (revision 374778)
      +++ src/java/org/apache/nutch/indexer/Indexer.java (working copy)
      @@ -227,6 +227,8 @@

      output.collect(key, new ObjectWritable(doc));
      }
      +
      + public void finished() {}

      public void index(File indexDir, File crawlDb, File linkDb, File[] segments)
      throws IOException

      { Index: src/java/org/apache/nutch/segment/SegmentReader.java =================================================================== --- src/java/org/apache/nutch/segment/SegmentReader.java (revision 374778) +++ src/java/org/apache/nutch/segment/SegmentReader.java (working copy) @@ -143,7 +143,9 @@ }

      output.collect(key, new ObjectWritable(dump.toString()));
      }
      -
      +
      + public void finished() {}
      +
      public void reader(File segment) throws IOException {
      LOG.info("Reader: segment: " + segment);

      Index: src/java/org/apache/nutch/mapred/Mapper.java
      ===================================================================
      — src/java/org/apache/nutch/mapred/Mapper.java (revision 374737)
      +++ src/java/org/apache/nutch/mapred/Mapper.java (working copy)
      @@ -39,4 +39,9 @@
      void map(WritableComparable key, Writable value,
      OutputCollector output, Reporter reporter)
      throws IOException;
      +
      + /** Called after the last

      {@link #map}

      call on this Mapper object.
      + Typical implementations do nothing.
      + */
      + void finished();
      }
      Index: src/java/org/apache/nutch/mapred/lib/RegexMapper.java
      ===================================================================
      — src/java/org/apache/nutch/mapred/lib/RegexMapper.java (revision 374737)
      +++ src/java/org/apache/nutch/mapred/lib/RegexMapper.java (working copy)
      @@ -53,4 +53,5 @@
      output.collect(new UTF8(matcher.group(group)), new LongWritable(1));
      }
      }
      + public void finished() {}
      }
      Index: src/java/org/apache/nutch/mapred/lib/InverseMapper.java
      ===================================================================
      — src/java/org/apache/nutch/mapred/lib/InverseMapper.java (revision 374737)
      +++ src/java/org/apache/nutch/mapred/lib/InverseMapper.java (working copy)
      @@ -38,4 +38,6 @@
      throws IOException

      { output.collect((WritableComparable)value, key); }

      +
      + public void finished() {}
      }
      Index: src/java/org/apache/nutch/mapred/lib/IdentityReducer.java
      ===================================================================
      — src/java/org/apache/nutch/mapred/lib/IdentityReducer.java (revision 374737)
      +++ src/java/org/apache/nutch/mapred/lib/IdentityReducer.java (working copy)
      @@ -42,4 +42,5 @@
      }
      }

      + public void finished() {}
      }
      Index: src/java/org/apache/nutch/mapred/lib/IdentityMapper.java
      ===================================================================
      — src/java/org/apache/nutch/mapred/lib/IdentityMapper.java (revision 374737)
      +++ src/java/org/apache/nutch/mapred/lib/IdentityMapper.java (working copy)
      @@ -39,4 +39,5 @@
      output.collect(key, val);
      }

      + public void finished() {}
      }
      Index: src/java/org/apache/nutch/mapred/lib/LongSumReducer.java
      ===================================================================
      — src/java/org/apache/nutch/mapred/lib/LongSumReducer.java (revision 374737)
      +++ src/java/org/apache/nutch/mapred/lib/LongSumReducer.java (working copy)
      @@ -47,4 +47,6 @@
      // output sum
      output.collect(key, new LongWritable(sum));
      }
      +
      + public void finished() {}
      }
      Index: src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java
      ===================================================================
      — src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java (revision 374737)
      +++ src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java (working copy)
      @@ -50,4 +50,6 @@
      output.collect(new UTF8(st.nextToken()), new LongWritable(1));
      }
      }
      +
      + public void finished() {}
      }
      Index: src/java/org/apache/nutch/mapred/ReduceTask.java
      ===================================================================
      — src/java/org/apache/nutch/mapred/ReduceTask.java (revision 374781)
      +++ src/java/org/apache/nutch/mapred/ReduceTask.java (working copy)
      @@ -275,6 +275,7 @@
      }

      } finally {
      + reducer.finished();
      in.close();
      lfs.delete(new File(sortedFile)); // remove sorted
      out.close(reporter);
      Index: src/java/org/apache/nutch/mapred/MapTask.java
      ===================================================================
      — src/java/org/apache/nutch/mapred/MapTask.java (revision 374737)
      +++ src/java/org/apache/nutch/mapred/MapTask.java (working copy)
      @@ -50,7 +50,7 @@
      public void write(DataOutput out) throws IOException

      { super.write(out); split.write(out); - + }

      public void readFields(DataInput in) throws IOException

      { super.readFields(in); @@ -126,6 +126,10 @@ }

      } finally {
      + if (combining)

      { + ((CombiningCollector)collector).finished(); + }

      +
      in.close(); // close input
      }
      } finally {
      @@ -147,5 +151,5 @@
      public NutchConf getConf()

      { return this.nutchConf; }
      • +
        }
        Index: src/java/org/apache/nutch/mapred/MapRunner.java
        ===================================================================

          • src/java/org/apache/nutch/mapred/MapRunner.java (revision 374737)
            +++ src/java/org/apache/nutch/mapred/MapRunner.java (working copy)
            @@ -38,18 +38,22 @@
            public void run(RecordReader input, OutputCollector output,
            Reporter reporter)
            throws IOException {
      • while (true) {
      • // allocate new key & value instances
      • WritableComparable key =
      • (WritableComparable)job.newInstance(inputKeyClass);
      • Writable value = (Writable)job.newInstance(inputValueClass);
        + try
        Unknown macro: {+ while (true) { + // allocate new key & value instances + WritableComparable key = + (WritableComparable)job.newInstance(inputKeyClass); + Writable value = (Writable)job.newInstance(inputValueClass); - // read next key & value - if (!input.next(key, value)) - return; + // read next key & value + if (!input.next(key, value)) + return; - // map pair to output - mapper.map(key, value, output, reporter); + // map pair to output + mapper.map(key, value, output, reporter); + }+ }

        finally

        { + mapper.finished(); }

        }

      Index: src/java/org/apache/nutch/mapred/CombiningCollector.java
      ===================================================================
      — src/java/org/apache/nutch/mapred/CombiningCollector.java (revision 374780)
      +++ src/java/org/apache/nutch/mapred/CombiningCollector.java (working copy)
      @@ -78,4 +78,9 @@
      count = 0;
      }

      + public synchronized void finished()
      +

      { + combiner.finished(); + }

      +
      }
      Index: src/java/org/apache/nutch/mapred/Reducer.java
      ===================================================================
      — src/java/org/apache/nutch/mapred/Reducer.java (revision 374737)
      +++ src/java/org/apache/nutch/mapred/Reducer.java (working copy)
      @@ -38,4 +38,10 @@
      void reduce(WritableComparable key, Iterator values,
      OutputCollector output, Reporter reporter)
      throws IOException;
      +
      + /** Called after the last

      {@link #reduce}

      call on this Reducer object.
      + Typical implementations do nothing.
      + */
      + void finished();
      +
      }
      Index: src/java/org/apache/nutch/crawl/CrawlDbReader.java
      ===================================================================
      — src/java/org/apache/nutch/crawl/CrawlDbReader.java (revision 374737)
      +++ src/java/org/apache/nutch/crawl/CrawlDbReader.java (working copy)
      @@ -50,9 +50,9 @@

      /**

      • Read utility for the CrawlDB.
      • *
        + *
      • @author Andrzej Bialecki
      • *
        + *
        */
        public class CrawlDbReader { @@ -68,6 +68,7 @@ output.collect(new UTF8("retry"), new LongWritable(cd.getRetriesSinceFetch())); output.collect(new UTF8("score"), new LongWritable((long) (cd.getScore() * 1000.0))); }

        + public void finished() {}
        }

      public static class CrawlDbStatReducer implements Reducer

      { @@ -121,6 +122,7 @@ output.collect(new UTF8("avg score"), new LongWritable(total / cnt)); }

      }
      + public void finished() {}
      }

      public static class CrawlDbDumpReducer implements Reducer {
      @@ -133,8 +135,11 @@

      public void configure(JobConf job) {
      }
      +
      + public void finished()

      { + }

      }

      • +
        public void processStatJob(String crawlDb, NutchConf config) throws IOException

        { LOG.info("CrawlDb statistics start: " + crawlDb); File tmpFolder = new File(crawlDb, "stat_tmp" + System.currentTimeMillis()); @@ -219,7 +224,7 @@ System.out.println("not found"); }

        }

      • +
        public void processDumpJob(String crawlDb, String output, NutchConf config) throws IOException

        { LOG.info("CrawlDb dump: starting"); @@ -270,4 +275,5 @@ }

        return;
        }
        +
        }
        Index: src/java/org/apache/nutch/crawl/LinkDb.java
        ===================================================================

          • src/java/org/apache/nutch/crawl/LinkDb.java (revision 374779)
            +++ src/java/org/apache/nutch/crawl/LinkDb.java (working copy)
            @@ -118,7 +118,8 @@
            output.collect(key, result);
            }

      -
      + public void finished() {}
      +
      public void invert(File linkDb, File segmentsDir) throws IOException

      { LOG.info("LinkDb: starting"); LOG.info("LinkDb: linkdb: " + linkDb); Index: src/java/org/apache/nutch/crawl/Injector.java =================================================================== --- src/java/org/apache/nutch/crawl/Injector.java (revision 374779) +++ src/java/org/apache/nutch/crawl/Injector.java (working copy) @@ -65,6 +65,8 @@ interval)); }

      }
      +
      + public void finished() {}
      }

      /** Combine multiple new entries for a url. */
      @@ -76,6 +78,7 @@
      throws IOException

      { output.collect(key, (Writable)values.next()); // just collect first value }

      + public void finished() {}
      }

      /** Construct an Injector. */
      Index: src/java/org/apache/nutch/crawl/Generator.java
      ===================================================================
      — src/java/org/apache/nutch/crawl/Generator.java (revision 374779)
      +++ src/java/org/apache/nutch/crawl/Generator.java (working copy)
      @@ -63,6 +63,8 @@
      output.collect(crawlDatum, key); // invert for sort by score
      }

      + public void finished() {}
      +
      /** Partition by host (value). */
      public int getPartition(WritableComparable key, Writable value,
      int numReduceTasks)

      { Index: src/java/org/apache/nutch/crawl/CrawlDbReducer.java =================================================================== --- src/java/org/apache/nutch/crawl/CrawlDbReducer.java (revision 374781) +++ src/java/org/apache/nutch/crawl/CrawlDbReducer.java (working copy) @@ -115,4 +115,5 @@ }

      }

      + public void finished() {}
      }
      Index: src/java/org/apache/nutch/parse/ParseSegment.java
      ===================================================================
      — src/java/org/apache/nutch/parse/ParseSegment.java (revision 374776)
      +++ src/java/org/apache/nutch/parse/ParseSegment.java (working copy)
      @@ -78,6 +78,8 @@
      throws IOException

      { output.collect(key, (Writable)values.next()); // collect first value }

      +
      + public void finished() {}

      public void parse(File segment) throws IOException {
      LOG.info("Parse: starting");

        Attachments

        1. mrclose.patch
          9 kB
          Michel Tourn
        2. mapredfinished.log
          14 kB
          Michel Tourn

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              michel_tourn Michel Tourn
            • Votes:
              1 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: