diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 9a76932..11f624e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -314,18 +314,31 @@ public class StoreFile { } /** - * @return true if this storefile was created by HFileOutputFormat - * for a bulk load. + * Check if this storefile was created by bulk load. + * When a hfile is bulk loaded into HBase, we append + * '_SeqId_' to the hfile name, unless + * "hbase.mapreduce.bulkload.assign.sequenceNumbers" is + * explicitly turned off. + * If "hbase.mapreduce.bulkload.assign.sequenceNumbers" + * is turned off, fall back to BULKLOAD_TIME_KEY. + * @return true if this storefile was created by bulk load. */ boolean isBulkLoadResult() { - return metadataMap.containsKey(BULKLOAD_TIME_KEY); + boolean bulkLoadedHFile = false; + String fileName = this.getPath().getName(); + int startPos = fileName.indexOf("SeqId_"); + if (startPos != -1) { + bulkLoadedHFile = true; + } + return metadataMap.containsKey(BULKLOAD_TIME_KEY) || bulkLoadedHFile; } /** * Return the timestamp at which this bulk load file was generated. */ public long getBulkLoadTimestamp() { - return Bytes.toLong(metadataMap.get(BULKLOAD_TIME_KEY)); + byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY); + return (bulkLoadTimestamp == null) ? 0 : Bytes.toLong(bulkLoadTimestamp); } /** @@ -371,7 +384,8 @@ public class StoreFile { // generate the sequenceId from the fileName // fileName is of the form _SeqId__ String fileName = this.getPath().getName(); - int startPos = fileName.indexOf("SeqId_"); + // Use lastIndexOf() to get the last, most recent bulk load seqId. + int startPos = fileName.lastIndexOf("SeqId_"); if (startPos != -1) { this.sequenceid = Long.parseLong(fileName.substring(startPos + 6, fileName.indexOf('_', startPos + 6))); @@ -380,6 +394,7 @@ public class StoreFile { this.sequenceid += 1; } } + this.reader.setBulkLoaded(true); } this.reader.setSequenceID(this.sequenceid); @@ -1008,6 +1023,7 @@ public class StoreFile { protected long sequenceID = -1; private byte[] lastBloomKey; private long deleteFamilyCnt = -1; + private boolean bulkLoadResult = false; public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) throws IOException { @@ -1050,6 +1066,9 @@ public class StoreFile { /** * Get a scanner to scan over this StoreFile. + * Bulk loaded files may or may not have mvcc info. + * We will consistently ignore MVCC info in bulk loaded file. + * They will be visible to scanners immediately following bulk load. * * @param cacheBlocks should this scanner cache blocks? * @param pread use pread (for highly concurrent small readers) @@ -1061,7 +1080,8 @@ public class StoreFile { boolean isCompaction, long readPt) { return new StoreFileScanner(this, getScanner(cacheBlocks, pread, isCompaction), - !isCompaction, reader.hasMVCCInfo(), readPt); + !isCompaction, reader.hasMVCCInfo() && !this.bulkLoadResult, + readPt); } /** @@ -1508,6 +1528,14 @@ public class StoreFile { public long getMaxTimestamp() { return timeRangeTracker == null ? Long.MAX_VALUE : timeRangeTracker.getMaximumTimestamp(); } + + public void setBulkLoaded(boolean bulkLoadResult) { + this.bulkLoadResult = bulkLoadResult; + } + + public boolean isBulkLoaded() { + return this.bulkLoadResult; + } } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java new file mode 100644 index 0000000..12bb40d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestScannerWithBulkload { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(1); + } + + private static void createTable(HBaseAdmin admin, String tableName) throws IOException { + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor hcd = new HColumnDescriptor("col"); + hcd.setMaxVersions(3); + desc.addFamily(hcd); + admin.createTable(desc); + } + + @Test + public void testBulkLoad() throws Exception { + String tableName = "testBulkLoad"; + long l = System.currentTimeMillis(); + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + createTable(admin, tableName); + Scan scan = createScan(); + final HTable table = init(admin, l, scan, tableName); + // use bulkload + final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file", + false); + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); + final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); + bulkload.doBulkLoad(hfilePath, table); + ResultScanner scanner = table.getScanner(scan); + Result result = scanner.next(); + result = scanAfterBulkLoad(scanner, result, "version2"); + Put put0 = new Put(Bytes.toBytes("row1")); + put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes + .toBytes("version3"))); + table.put(put0); + table.flushCommits(); + admin.flush(tableName); + scanner = table.getScanner(scan); + result = scanner.next(); + while (result != null) { + List kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q")); + for (KeyValue _kv : kvs) { + if (Bytes.toString(_kv.getRow()).equals("row1")) { + System.out.println(Bytes.toString(_kv.getRow())); + System.out.println(Bytes.toString(_kv.getQualifier())); + System.out.println(Bytes.toString(_kv.getValue())); + Assert.assertEquals("version3", Bytes.toString(_kv.getValue())); + } + } + result = scanner.next(); + } + scanner.close(); + table.close(); + } + + private Result scanAfterBulkLoad(ResultScanner scanner, Result result, String expctedVal) + throws IOException { + while (result != null) { + List kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q")); + for (KeyValue _kv : kvs) { + if (Bytes.toString(_kv.getRow()).equals("row1")) { + System.out.println(Bytes.toString(_kv.getRow())); + System.out.println(Bytes.toString(_kv.getQualifier())); + System.out.println(Bytes.toString(_kv.getValue())); + Assert.assertEquals(expctedVal, Bytes.toString(_kv.getValue())); + } + } + result = scanner.next(); + } + return result; + } + + // If nativeHFile is true, we will set cell seq id and MAX_SEQ_ID_KEY in the file. + // Else, we will set BULKLOAD_TIME_KEY. + private Path writeToHFile(long l, String hFilePath, String pathStr, boolean nativeHFile) + throws IOException { + FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); + final Path hfilePath = new Path(hFilePath); + fs.mkdirs(hfilePath); + Path path = new Path(pathStr); + HFile.WriterFactory wf = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration()); + Assert.assertNotNull(wf); + HFileContext context = new HFileContext(); + HFile.Writer writer = wf.withPath(fs, path).withFileContext(context).create(); + KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, + Bytes.toBytes("version2")); + + // Set cell mvcc to test bulk load native hfiles. + if (nativeHFile) { + // Set a big seq id. Scan should not look at this seq id in a bulk loaded file. + // Scan should only look at the seq id appended at the bulk load time, and not skip + // this kv. + kv.setMvccVersion(9999999); + } + + writer.append(kv); + + if (nativeHFile) { + // Set a big MAX_SEQ_ID_KEY. Scan should not look at this seq id in a bulk loaded file. + // Scan should only look at the seq id appended at the bulk load time, and not skip its + // kv. + writer.appendFileInfo(StoreFile.MAX_SEQ_ID_KEY, Bytes.toBytes(new Long(9999999))); + } + else { + writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis())); + } + writer.close(); + return hfilePath; + } + + private HTable init(HBaseAdmin admin, long l, Scan scan, String tableName) throws Exception { + HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); + Put put0 = new Put(Bytes.toBytes("row1")); + put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes + .toBytes("version0"))); + table.put(put0); + table.flushCommits(); + admin.flush(tableName); + Put put1 = new Put(Bytes.toBytes("row2")); + put1.add(new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes + .toBytes("version0"))); + table.put(put1); + table.flushCommits(); + admin.flush(tableName); + admin.close(); + put0 = new Put(Bytes.toBytes("row1")); + put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes + .toBytes("version1"))); + table.put(put0); + table.flushCommits(); + admin.flush(tableName); + admin.compact(tableName); + + ResultScanner scanner = table.getScanner(scan); + Result result = scanner.next(); + List kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q")); + Assert.assertEquals(1, kvs.size()); + Assert.assertEquals("version1", Bytes.toString(kvs.get(0).getValue())); + scanner.close(); + return table; + } + + @Test + public void testBulkLoadWithParallelScan() throws Exception { + String tableName = "testBulkLoadWithParallelScan"; + final long l = System.currentTimeMillis(); + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + createTable(admin, tableName); + Scan scan = createScan(); + final HTable table = init(admin, l, scan, tableName); + // use bulkload + final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/", + "/temp/testBulkLoadWithParallelScan/col/file", false); + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); + final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); + ResultScanner scanner = table.getScanner(scan); + // Create a scanner and then do bulk load + final CountDownLatch latch = new CountDownLatch(1); + new Thread() { + public void run() { + try { + Put put1 = new Put(Bytes.toBytes("row5")); + put1.add(new KeyValue(Bytes.toBytes("row5"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, + Bytes.toBytes("version0"))); + table.put(put1); + table.flushCommits(); + bulkload.doBulkLoad(hfilePath, table); + latch.countDown(); + } catch (TableNotFoundException e) { + } catch (IOException e) { + } + } + }.start(); + latch.await(); + // We had 'version0', 'version1' for row1,col:q. Bulk load adds 'version2' + // By the time we do next() the bulk loaded file is also added to the kv + // scanner and is immediately visible with no mvcc check. + Result result = scanner.next(); + scanAfterBulkLoad(scanner, result, "version2"); + scanner.close(); + table.close(); + + } + + @Test + public void testBulkLoadNativeHFile() throws Exception { + String tableName = "testBulkLoadNativeHFile"; + long l = System.currentTimeMillis(); + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + createTable(admin, tableName); + Scan scan = createScan(); + final HTable table = init(admin, l, scan, tableName); + // use bulkload + final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadNativeHFile/", + "/temp/testBulkLoadNativeHFile/col/file", true); + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); + final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); + bulkload.doBulkLoad(hfilePath, table); + ResultScanner scanner = table.getScanner(scan); + Result result = scanner.next(); + // We had 'version0', 'version1' for 'row1,col:q' in the table. + // Bulk load added 'version2' scanner should be able to see 'version2' + result = scanAfterBulkLoad(scanner, result, "version2"); + scanner.close(); + table.close(); + } + + private Scan createScan() { + Scan scan = new Scan(); + scan.setMaxVersions(3); + return scan; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } +}