.../org/apache/hadoop/hbase/CellComparator.java | 30 +++++ .../java/org/apache/hadoop/hbase/KeyValue.java | 5 + .../hadoop/hbase/regionserver/KeyValueHeap.java | 42 +++++- .../hadoop/hbase/regionserver/StoreFile.java | 10 ++ .../hbase/regionserver/StoreFileScanner.java | 2 +- .../hbase/regionserver/TestScanWithBloomError.java | 11 +- .../regionserver/TestScannerWithBulkload.java | 145 +++++++++++++++++++++ 7 files changed, 238 insertions(+), 7 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java index e9c83cc..593cc12 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java @@ -78,6 +78,36 @@ public class CellComparator implements Comparator, Serializable{ } } + public static int compareCellsWithoutMvcc(Cell a, Cell b) { + // row + int c = compareRows(a, b); + if (c != 0) + return c; + + c = compareWithoutRow(a, b); + if (c != 0) + return c; + + // Negate following comparisons so later edits show up first + + // compare log replay tag value if there is any + // when either keyvalue tagged with log replay sequence number, we need to + // compare them: + // 1) when both keyvalues have the tag, then use the tag values for + // comparison + // 2) when one has and the other doesn't have, the one without the log + // replay tag wins because + // it means the edit isn't from recovery but new one coming from clients + // during recovery + // 3) when both doesn't have, then skip to the next mvcc comparison + long leftChangeSeqNum = getReplaySeqNum(a); + long RightChangeSeqNum = getReplaySeqNum(b); + if (leftChangeSeqNum != Long.MAX_VALUE || RightChangeSeqNum != Long.MAX_VALUE) { + return Longs.compare(RightChangeSeqNum, leftChangeSeqNum); + } + return 0; + } + /** * Return replay log sequence number for the cell * diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index b9aa2f3..86eed0a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -1901,6 +1901,11 @@ public class KeyValue implements Cell, HeapSize, Cloneable { return compare; } + public int compareWithoutMvcc(final Cell left, final Cell right) { + int compare = CellComparator.compareCellsWithoutMvcc(left, right); + return compare; + } + public int compareTimestamps(final Cell left, final Cell right) { return CellComparator.compareTimestamps(left, right); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 69bfcdf..c3d8b4c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -26,7 +26,10 @@ import java.util.PriorityQueue; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.regionserver.StoreFile.Reader; /** * Implements a heap merge across any number of KeyValueScanners. @@ -177,8 +180,23 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner public KVScannerComparator(KVComparator kvComparator) { this.kvComparator = kvComparator; } + public int compare(KeyValueScanner left, KeyValueScanner right) { - int comparison = compare(left.peek(), right.peek()); + boolean bulkLoad = false; + if (left.isFileScanner()) { + Reader reader = ((StoreFileScanner) left).getReader(); + bulkLoad = reader.isBulkLoadResult(); + } + if (!bulkLoad && right.isFileScanner()) { + Reader reader = ((StoreFileScanner) right).getReader(); + bulkLoad = reader.isBulkLoadResult(); + } + int comparison = 0; + if (bulkLoad) { + comparison = compareWithoutMvcc(left.peek(), right.peek()); + } else { + comparison = compare(left.peek(), right.peek()); + } if (comparison != 0) { return comparison; } else { @@ -187,6 +205,16 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner long leftSequenceID = left.getSequenceID(); long rightSequenceID = right.getSequenceID(); if (leftSequenceID > rightSequenceID) { + if(bulkLoad) { + // TODO : While doing cells this is should be avoided in the read path. + KeyValue leftKV = KeyValueUtil.ensureKeyValue(left.peek()); + KeyValue rightKV = KeyValueUtil.ensureKeyValue(right.peek()); + if(leftKV.getSequenceId() == 0) { + leftKV.setSequenceId(rightKV.getSequenceId()); + } else { + rightKV.setSequenceId(leftKV.getSequenceId()); + } + } return -1; } else if (leftSequenceID < rightSequenceID) { return 1; @@ -204,6 +232,18 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner public int compare(Cell left, Cell right) { return this.kvComparator.compare(left, right); } + + /** + * Compares two KeyValue without mvcc + * + * @param left + * @param right + * @return less than 0 if left is smaller, 0 if equal etc.. + */ + public int compareWithoutMvcc(Cell left, Cell right) { + return this.kvComparator.compareWithoutMvcc(left, right); + } + /** * @return KVComparator */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index ec164ca..9aa8d34 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -381,6 +381,7 @@ public class StoreFile { this.sequenceid += 1; } } + this.reader.setBulkLoadResult(true); } this.reader.setSequenceID(this.sequenceid); @@ -1009,6 +1010,7 @@ public class StoreFile { protected long sequenceID = -1; private byte[] lastBloomKey; private long deleteFamilyCnt = -1; + private boolean bulkLoadResult = false; public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) throws IOException { @@ -1475,6 +1477,14 @@ public class StoreFile { this.sequenceID = sequenceID; } + public void setBulkLoadResult(boolean bulkLoadResult) { + this.bulkLoadResult = bulkLoadResult; + } + + public boolean isBulkLoadResult() { + return this.bulkLoadResult; + } + BloomFilter getGeneralBloomFilter() { return generalBloomFilter; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 1b07594..21b85ed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -349,7 +349,7 @@ public class StoreFileScanner implements KeyValueScanner { return true; } - Reader getReaderForTesting() { + Reader getReader() { return reader; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java index 011de0f..54e8517 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java @@ -18,6 +18,9 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -53,8 +56,6 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import static org.junit.Assert.*; - /** * Test a multi-column scanner when there is a Bloom filter false-positive. * This is needed for the multi-column Bloom filter optimization. @@ -132,8 +133,8 @@ public class TestScanWithBloomError { Collections.sort(scanners, new Comparator() { @Override public int compare(StoreFileScanner s1, StoreFileScanner s2) { - Path p1 = s1.getReaderForTesting().getHFileReader().getPath(); - Path p2 = s2.getReaderForTesting().getHFileReader().getPath(); + Path p1 = s1.getReader().getHFileReader().getPath(); + Path p2 = s2.getReader().getHFileReader().getPath(); long t1, t2; try { t1 = fs.getFileStatus(p1).getModificationTime(); @@ -147,7 +148,7 @@ public class TestScanWithBloomError { StoreFile.Reader lastStoreFileReader = null; for (StoreFileScanner sfScanner : scanners) - lastStoreFileReader = sfScanner.getReaderForTesting(); + lastStoreFileReader = sfScanner.getReader(); new HFilePrettyPrinter().run(new String[]{ "-m", "-p", "-f", lastStoreFileReader.getHFileReader().getPath().toString()}); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java new file mode 100644 index 0000000..8b82ded --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestScannerWithBulkload { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static String tableName = "testBulkload"; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(1); + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor hcd = new HColumnDescriptor("col"); + hcd.setMaxVersions(3); + desc.addFamily(hcd); + admin.createTable(desc); + admin.close(); + } + + @Test + public void testBulkLoad() throws Exception { + long l = System.currentTimeMillis(); + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); + Put put0 = new Put(Bytes.toBytes("row1")); + put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes + .toBytes("version0"))); + table.put(put0); + table.flushCommits(); + admin.flush(tableName); + Put put1 = new Put(Bytes.toBytes("row2")); + put1.add(new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes + .toBytes("version0"))); + table.put(put1); + table.flushCommits(); + // table.close(); + admin.flush(tableName); + admin.close(); + put0 = new Put(Bytes.toBytes("row1")); + put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes + .toBytes("version1"))); + table.put(put0); + table.put(put0); + table.flushCommits(); + admin.flush(tableName); + admin.compact(tableName); + + table = new HTable(TEST_UTIL.getConfiguration(), tableName); + Scan scan = new Scan(); + scan.setMaxVersions(3); + ResultScanner scanner = table.getScanner(scan); + Result result = scanner.next(); + List kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q")); + Assert.assertEquals(1, kvs.size()); + Assert.assertEquals("version1", Bytes.toString(kvs.get(0).getValue())); + + // use bulkload + HTable table1 = new HTable(TEST_UTIL.getConfiguration(), tableName); + FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); + Path hfilePath = new Path("/temp/bulkload/"); + fs.mkdirs(hfilePath); + Path path = new Path("/temp/bulkload/col/file"); + HFile.WriterFactory wf = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration()); + Assert.assertNotNull(wf); + HFileContext context = new HFileContext(); + HFile.Writer writer = wf.withPath(fs, path).withFileContext(context).create(); + KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, + Bytes.toBytes("version2")); + writer.append(kv); + // Add the bulk load time_key. otherwise we cannot ensure that it is a bulk loaded file + writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis())); + writer.close(); + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); + LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); + bulkload.doBulkLoad(hfilePath, table1); + table1.close(); + table.close(); + table = new HTable(TEST_UTIL.getConfiguration(), tableName); + scanner = table.getScanner(scan); + result = scanner.next(); + while (result != null) { + kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q")); + for (KeyValue _kv : kvs) { + if (Bytes.toString(_kv.getRow()).equals("row1")) { + System.out.println(Bytes.toString(_kv.getRow())); + System.out.println(Bytes.toString(_kv.getQualifier())); + System.out.println(Bytes.toString(_kv.getValue())); + Assert.assertEquals("version2", Bytes.toString(_kv.getValue())); + } + } + result = scanner.next(); + } + table.close(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } +}