.../hadoop/hbase/mapreduce/PutSortReducer.java | 86 +++++++++++++++++++++- 1 file changed, 84 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java index a71b66a2f5..c274da4150 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java @@ -18,17 +18,22 @@ */ package org.apache.hadoop.hbase.mapreduce; +import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.TreeSet; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.StringUtils; @@ -88,4 +93,81 @@ public class PutSortReducer extends } } } + + private void reduce1Row(Put put) throws IOException { + TreeSet map = new TreeSet(KeyValue.COMPARATOR); + long curSize = 0; + Put p = put; + for (List cells : p.getFamilyCellMap().values()) { + for (Cell cell : cells) { + KeyValue kv = KeyValueUtil.ensureKeyValueTypeForMR(cell); + if (map.add(kv)) {// don't count duplicated kv into size + curSize += kv.heapSize(); + } + } + } + } + + public static void main(String[] args) throws IOException { + PutSortReducer psr = new PutSortReducer(); + TsvImporterMapper tim = new TsvImporterMapper(); + Configuration conf = new Configuration(); + conf.set(ImportTsv.BULK_OUTPUT_CONF_KEY, "/home/123"); + StringBuilder sb = new StringBuilder(); + sb.append("HBASE_ROW_KEY,HBASE_TS_KEY,d:time,d:num,d:bid,d:prol,d:up,d:down"); + for (int i = 7; i <= 300; i++) { + sb.append(",d:col" + i); + } + + ImportTsv.TsvParser parser = new ImportTsv.TsvParser(sb.toString(), ","); + tim.parser = parser; + + StringBuilder sb1 = new StringBuilder(); + sb1.append("18600000001201501011000000068100,18600000001,20150101,7,325,281701,1724,1724"); + for (int i = 7; i <= 300; i++) { + sb1.append(",1111"); + } + byte[] lineBytes = Bytes.toBytes(sb1.toString()); + + try { + ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, sb1.length()); + ImmutableBytesWritable rowKey = + new ImmutableBytesWritable(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength()); + + Put put = new Put(rowKey.copyBytes()); + List cells = new ArrayList(); + for (int i = 0; i < parsed.getColumnCount(); i++) { + if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex() + || i == parser.getAttributesKeyColumnIndex() + || i == parser.getCellVisibilityColumnIndex() || i == parser.getCellTTLColumnIndex() + || (false && parsed.getColumnLength(i) == 0)) { + continue; + } + + Cell cell = psr.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), + parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, + parser.getQualifier(i).length, parsed.getTimestamp(0), lineBytes, + parsed.getColumnOffset(i), parsed.getColumnLength(i), new ArrayList()); + put.add(cell); + } + // int rows = Integer.parseInt(args[0]); + int rows = Integer.parseInt("100000"); + long t1 = System.currentTimeMillis(); + for (int j = 0; j < rows; j++) { + psr.reduce1Row(put); + } + System.out.println("Time took for reduce process is " + (System.currentTimeMillis() - t1)); + + } catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException badLine) { + badLine.printStackTrace(); + } + } + + public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength, + byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset, + int vlength, ArrayList tags) throws IOException { + return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, + qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, tags); + } + }