diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index ff09ea6c45..2dd3c3440d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -32,6 +32,8 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; + +import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -1846,15 +1848,87 @@ public class KeyValue implements ExtendedCell, Cloneable { * rowkey, colfam/qual, timestamp, type, mvcc */ @Override - public int compare(final Cell left, final Cell right) { - int compare = CellComparatorImpl.COMPARATOR.compare(left, right); - return compare; + public int compare(final Cell leftCell, final Cell rightCell) { + // we know all are Keyvalues only + KeyValue left = (KeyValue) leftCell; + KeyValue right = (KeyValue) rightCell; + int lRowLen = left.getRowLength(); + int lRowOffset = left.getRowOffset(); + int rRowLen = right.getRowLength(); + int rRowOffset = right.getRowOffset(); + int diff = compareRows(left.getRowArray(), lRowOffset, lRowLen, right.getRowArray(), + rRowOffset, rRowLen); + if (diff != 0) { + return diff; + } else { + // pass the row offset and row length + return compareWithoutRow(left, right, lRowOffset, lRowLen, rRowOffset, rRowLen); + } + } + + public int compareWithoutRow(KeyValue left, KeyValue right, int lRowOffset, int lRowLen, + int rRowOffset, int rRowLen) { + int lFamOffset = left.getFamilyOffset(lRowLen); + int rFamOffset = right.getFamilyOffset(rRowLen); + int lFamLength = left.getFamilyLength(lFamOffset); + int rFamLength = right.getFamilyLength(rFamOffset); + int lQualLength = left.getQualifierLength(lRowLen, lFamLength); + int rQualLength = right.getQualifierLength(rRowLen, rFamLength); + int lQualOffset = left.getQualifierOffset(lFamOffset); + int rQualOffset = right.getQualifierOffset(rFamOffset); + if (lFamLength + lQualLength == 0 && left.getTypeByte() == Type.Minimum.getCode()) { + // left is "bigger", i.e. it appears later in the sorted order + return 1; + } + if (rFamLength + rQualLength == 0 && right.getTypeByte() == Type.Minimum.getCode()) { + return -1; + } + if (lFamLength != rFamLength) { + // comparing column family is enough. + return compareFamilies(left, lFamOffset, lFamLength, right, rFamOffset, rFamLength); + } + // Compare cf:qualifier + int diff = compareColumns(left, right, lFamOffset, rFamOffset, + lFamLength, rFamLength, lQualOffset, rQualOffset, lQualLength, rQualLength); + if (diff != 0) { + return diff; + } + + diff = compareTimestamps(left.getTimestamp(), right.getTimestamp()); + if (diff != 0) { + return diff; + } + + // Compare types. Let the delete types sort ahead of puts; i.e. types + // of higher numbers sort before those of lesser numbers. Maximum (255) + // appears ahead of everything, and minimum (0) appears after + // everything. + return (0xff & right.getTypeByte()) - (0xff & left.getTypeByte()); + } + + public final int compareFamilies(KeyValue left, int lFOffset, int lFlength, KeyValue right, + int rQOffset, int rFLength) { + + return Bytes.compareTo(left.getFamilyArray(), lFOffset, lFlength, right.getFamilyArray(), + rFLength, rFLength); } public int compareTimestamps(final Cell left, final Cell right) { return CellComparatorImpl.COMPARATOR.compareTimestamps(left, right); } + public int compareColumns(final Cell left, final Cell right, int lFamOffset, int rFamOffset, + int lFamLength, int rFamLength, int lQualOffset, int rQualOffset, int lQualLength, + int rQualLength) { + int diff = Bytes.compareTo(left.getFamilyArray(), lFamOffset, lFamLength, + right.getFamilyArray(), rFamOffset, rFamLength); + if (diff != 0) { + return diff; + } else { + return Bytes.compareTo(left.getQualifierArray(), lQualOffset, lQualLength, + right.getQualifierArray(), rQualOffset, rQualLength); + } + } /** * @param left * @param right @@ -1875,8 +1949,8 @@ public class KeyValue implements ExtendedCell, Cloneable { * @param rlength * @return 0 if equal, <0 if left smaller, >0 if right smaller */ - public int compareRows(byte [] left, int loffset, int llength, - byte [] right, int roffset, int rlength) { + public int compareRows(byte[] left, int loffset, int llength, byte[] right, int roffset, + int rlength) { return Bytes.compareTo(left, loffset, llength, right, roffset, rlength); } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java index f4ad1f25fe..2fc1e90cb9 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java @@ -144,4 +144,114 @@ public class PutSortReducer extends } } } + + private void reduce1Row(Put put) throws java.io.IOException, InterruptedException { + TreeSet map = new TreeSet<>(KeyValue.COMPARATOR); + long curSize = 0; + List tags = new ArrayList<>(); + tags.clear(); + Put p = put; + long t = p.getTTL(); + if (t != Long.MAX_VALUE) { + // add TTL tag if found + tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(t))); + } + byte[] acl = p.getACL(); + if (acl != null) { + // add ACL tag if found + tags.add(new ArrayBackedTag(TagType.ACL_TAG_TYPE, acl)); + } + try { + CellVisibility cellVisibility = p.getCellVisibility(); + if (cellVisibility != null) { + // add the visibility labels if any + tags.addAll(kvCreator.getVisibilityExpressionResolver() + .createVisibilityExpTags(cellVisibility.getExpression())); + } + } catch (DeserializationException e) { + // We just throw exception here. Should we allow other mutations to proceed by + // just ignoring the bad one? + throw new IOException("Invalid visibility expression found in mutation " + p, e); + } + for (List cells : p.getFamilyCellMap().values()) { + for (Cell cell : cells) { + // Creating the KV which needs to be directly written to HFiles. Using the Facade + // KVCreator for creation of kvs. + KeyValue kv = null; + TagUtil.carryForwardTags(tags, cell); + if (!tags.isEmpty()) { + kv = (KeyValue) kvCreator.create(cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), + cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength(), cell.getTimestamp(), cell.getValueArray(), + cell.getValueOffset(), cell.getValueLength(), tags); + } else { + kv = KeyValueUtil.ensureKeyValue(cell); + } + if (map.add(kv)) {// don't count duplicated kv into size + curSize += kv.heapSize(); + } + } + } + } + + public static void main(String[] args) throws IOException, InterruptedException { + PutSortReducer psr = new PutSortReducer(); + TsvImporterMapper tim = new TsvImporterMapper(); + Configuration conf = new Configuration(); + conf.set(ImportTsv.BULK_OUTPUT_CONF_KEY, "/home/123"); + StringBuilder sb = new StringBuilder(); + sb.append("HBASE_ROW_KEY,HBASE_TS_KEY,d:time,d:num,d:bid,d:prol,d:up,d:down"); + for (int i = 7; i <= 300; i++) { + sb.append(",d:col" + i); + } + + ImportTsv.TsvParser parser = new ImportTsv.TsvParser(sb.toString(), ","); + tim.parser = parser; + StringBuilder sb1 = new StringBuilder(); + sb1.append("18600000001201501011000000068100,18600000001,20150101,7,325,281701,1724,1724"); + for (int i = 7; i <= 300; i++) { + sb1.append(",1111"); + } + byte[] lineBytes = Bytes.toBytes(sb1.toString()); + + try { + ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, sb1.length()); + ImmutableBytesWritable rowKey = + new ImmutableBytesWritable(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength()); + + Put put = new Put(rowKey.copyBytes()); + List cells = new ArrayList(); + for (int i = 0; i < parsed.getColumnCount(); i++) { + if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex() + || i == parser.getAttributesKeyColumnIndex() + || i == parser.getCellVisibilityColumnIndex() || i == parser.getCellTTLColumnIndex() + || (false && parsed.getColumnLength(i) == 0)) { + continue; + } + Cell cell = psr.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), + parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, + parser.getQualifier(i).length, parsed.getTimestamp(0), lineBytes, + parsed.getColumnOffset(i), parsed.getColumnLength(i), new ArrayList<>()); + put.add(cell); + } + // int rows = Integer.parseInt(args[0]); + int rows = Integer.parseInt("100000"); + long t1 = System.currentTimeMillis(); + for (int j = 0; j < rows; j++) { + psr.reduce1Row(put); + } + System.out.println("Time took for reduce process is " + (System.currentTimeMillis() - t1)); + + } catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException badLine) { + badLine.printStackTrace(); + } + } + + public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength, + byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset, + int vlength, List tags) throws IOException { + return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, + qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, tags); + } }