Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java (revision 1505606) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java (working copy) @@ -19,15 +19,18 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; -import java.util.TreeMap; +import java.util.List; +import java.util.Map.Entry; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Reducer; /** @@ -43,31 +46,50 @@ @Override protected void reduce(K row, Iterable vals, Context context) throws IOException, InterruptedException { - - int cnt = 0; - // There's nothing to say K row is the same as the rowkey - // used to construct Puts (value) instances. Thus the map of put.getRow() - // to combined Put is necessary. - // TODO: would be better if we knew K row and Put rowkey were - // identical. Then this whole Put buffering business goes away. - // TODO: Could use HeapSize to create an upper bound on the memory size of - // the puts map and flush some portion of the content while looping. This + // Using HeapSize to create an upper bound on the memory size of + // the puts and flush some portion of the content while looping. This // flush could result in multiple Puts for a single rowkey. That is // acceptable because Combiner is run as an optimization and it's not // critical that all Puts are grouped perfectly. - Map puts = new TreeMap(Bytes.BYTES_COMPARATOR); + long threshold = context.getConfiguration().getLong( + "putcombiner.row.threshold", 1L * (1<<30)); + int cnt = 0; + long curSize = 0; + Put put = null; + Map> familyMap = null; for (Put p : vals) { cnt++; - if (!puts.containsKey(p.getRow())) { - puts.put(p.getRow(), p); + if (put == null) { + put = p; + familyMap = put.getFamilyMap(); } else { - puts.get(p.getRow()).getFamilyMap().putAll(p.getFamilyMap()); + for (Entry> entry : p.getFamilyMap() + .entrySet()) { + List cells = familyMap.get(entry.getKey()); + List kvs = (cells != null) ? (List) cells : null; + for (Cell cell : entry.getValue()) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + curSize += kv.heapSize(); + if (kvs != null) { + kvs.add(kv); + } + } + if (cells == null) { + familyMap.put(entry.getKey(), entry.getValue()); + } + } + if (cnt % 10 == 0) context.setStatus("Combine " + cnt); + if (curSize > threshold) { + LOG.info(String.format("Combined %d Put(s) into %d.", cnt, 1)); + context.write(row, put); + put = null; + cnt = 0; + } } } - - for (Put p : puts.values()) { - context.write(row, p); + if (put != null) { + LOG.info(String.format("Combined %d Put(s) into %d.", cnt, 1)); + context.write(row, put); } - LOG.info(String.format("Combined %d Put(s) into %d.", cnt, puts.size())); } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java (revision 1505606) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java (working copy) @@ -55,7 +55,7 @@ { // although reduce() is called per-row, handle pathological case long threshold = context.getConfiguration().getLong( - "putsortreducer.row.threshold", 2L * (1<<30)); + "putsortreducer.row.threshold", 1L * (1<<30)); Iterator iter = puts.iterator(); while (iter.hasNext()) { TreeSet map = new TreeSet(KeyValue.COMPARATOR); @@ -67,7 +67,7 @@ for (Cell cell: cells) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); map.add(kv); - curSize += kv.getLength(); + curSize += kv.heapSize(); } } }