.../hadoop/hbase/mapreduce/PutSortReducer.java | 110 +++++++++++++++++++++ 1 file changed, 110 insertions(+) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java index f4ad1f25fe..b22da6ffaf 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java @@ -144,4 +144,114 @@ public class PutSortReducer extends } } } + + private void reduce1Row(Put put) throws java.io.IOException, InterruptedException { + TreeSet map = new TreeSet<>(CellComparator.getInstance()); + long curSize = 0; + List tags = new ArrayList<>(); + tags.clear(); + Put p = put; + long t = p.getTTL(); + if (t != Long.MAX_VALUE) { + // add TTL tag if found + tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(t))); + } + byte[] acl = p.getACL(); + if (acl != null) { + // add ACL tag if found + tags.add(new ArrayBackedTag(TagType.ACL_TAG_TYPE, acl)); + } + try { + CellVisibility cellVisibility = p.getCellVisibility(); + if (cellVisibility != null) { + // add the visibility labels if any + tags.addAll(kvCreator.getVisibilityExpressionResolver() + .createVisibilityExpTags(cellVisibility.getExpression())); + } + } catch (DeserializationException e) { + // We just throw exception here. Should we allow other mutations to proceed by + // just ignoring the bad one? + throw new IOException("Invalid visibility expression found in mutation " + p, e); + } + for (List cells : p.getFamilyCellMap().values()) { + for (Cell cell : cells) { + // Creating the KV which needs to be directly written to HFiles. Using the Facade + // KVCreator for creation of kvs. + KeyValue kv = null; + TagUtil.carryForwardTags(tags, cell); + if (!tags.isEmpty()) { + kv = (KeyValue) kvCreator.create(cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), + cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength(), cell.getTimestamp(), cell.getValueArray(), + cell.getValueOffset(), cell.getValueLength(), tags); + } else { + kv = KeyValueUtil.ensureKeyValue(cell); + } + if (map.add(kv)) {// don't count duplicated kv into size + curSize += kv.heapSize(); + } + } + } + } + + public static void main(String[] args) throws IOException, InterruptedException { + PutSortReducer psr = new PutSortReducer(); + TsvImporterMapper tim = new TsvImporterMapper(); + Configuration conf = new Configuration(); + conf.set(ImportTsv.BULK_OUTPUT_CONF_KEY, "/home/123"); + StringBuilder sb = new StringBuilder(); + sb.append("HBASE_ROW_KEY,HBASE_TS_KEY,d:time,d:num,d:bid,d:prol,d:up,d:down"); + for (int i = 7; i <= 300; i++) { + sb.append(",d:col" + i); + } + + ImportTsv.TsvParser parser = new ImportTsv.TsvParser(sb.toString(), ","); + tim.parser = parser; + StringBuilder sb1 = new StringBuilder(); + sb1.append("18600000001201501011000000068100,18600000001,20150101,7,325,281701,1724,1724"); + for (int i = 7; i <= 300; i++) { + sb1.append(",1111"); + } + byte[] lineBytes = Bytes.toBytes(sb1.toString()); + + try { + ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, sb1.length()); + ImmutableBytesWritable rowKey = + new ImmutableBytesWritable(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength()); + + Put put = new Put(rowKey.copyBytes()); + List cells = new ArrayList(); + for (int i = 0; i < parsed.getColumnCount(); i++) { + if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex() + || i == parser.getAttributesKeyColumnIndex() + || i == parser.getCellVisibilityColumnIndex() || i == parser.getCellTTLColumnIndex() + || (false && parsed.getColumnLength(i) == 0)) { + continue; + } + Cell cell = psr.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), + parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, + parser.getQualifier(i).length, parsed.getTimestamp(0), lineBytes, + parsed.getColumnOffset(i), parsed.getColumnLength(i), new ArrayList<>()); + put.add(cell); + } + // int rows = Integer.parseInt(args[0]); + int rows = Integer.parseInt("100000"); + long t1 = System.currentTimeMillis(); + for (int j = 0; j < rows; j++) { + psr.reduce1Row(put); + } + System.out.println("Time took for reduce process is " + (System.currentTimeMillis() - t1)); + + } catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException badLine) { + badLine.printStackTrace(); + } + } + + public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength, + byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset, + int vlength, List tags) throws IOException { + return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, + qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, tags); + } }