Description
I tried to add the follwing combiner to LinkDb
public static enum Counters
{COMBINED} public static class LinkDbCombiner extends MapReduceBase implements Reducer {
private int _maxInlinks;
@Override
public void configure(JobConf job)
public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
final Inlinks inlinks = (Inlinks) values.next();
int combined = 0;
while (values.hasNext()) {
Inlinks val = (Inlinks) values.next();
for (Iterator it = val.iterator(); it.hasNext() {
if (inlinks.size() >= _maxInlinks) {
if (combined > 0)
output.collect(key, inlinks);
return;
}
Inlink in = (Inlink) it.next();
inlinks.add(in);
}
combined++;
}
if (inlinks.size() == 0)
if (combined > 0)
{ reporter.incrCounter(Counters.COMBINED, combined); } output.collect(key, inlinks);
}
}
This greatly reduced the time it took to generate a new linkdb. In my case it reduced the time by half.
Map output records 8717810541
Combined 7632541507
Resulting output rec 1085269034
That's a 87% reduction of output records from the map phase