Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java (revision 0) @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; +import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ServerCallable; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Test; + +import com.google.common.collect.Lists; + +/** + * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of + * the region server's bullkLoad functionality. + */ +public class TestHRegionServerBulkLoad { + final static Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class); + private static HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private final static Configuration conf = UTIL.getConfiguration(); + private final static byte[] QUAL = Bytes.toBytes("qual"); + private final static int NUM_CFS = 10; + public static int BLOCKSIZE = 64 * 1024; + public static String COMPRESSION = Compression.Algorithm.NONE.getName(); + + private final static byte[][] families = new byte[NUM_CFS][]; + static { + for (int i = 0; i < NUM_CFS; i++) { + families[i] = Bytes.toBytes(family(i)); + } + } + + static byte[] rowkey(int i) { + return Bytes.toBytes(String.format("row_%08d", i)); + } + + static String family(int i) { + return String.format("family_%04d", i); + } + + /** + * Create an HFile with the given number of rows with a specified value. + */ + public static void createHFile(FileSystem fs, Path path, byte[] family, + byte[] qualifier, byte[] value, int numRows) throws IOException { + HFile.Writer writer = HFile + .getWriterFactory(conf, new CacheConfig(conf)) + .createWriter(fs, path, BLOCKSIZE, COMPRESSION, KeyValue.KEY_COMPARATOR); + long now = System.currentTimeMillis(); + try { + // subtract 2 since iterateOnSplits doesn't include boundary keys + for (int i = 0; i < numRows; i++) { + KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value); + writer.append(kv); + } + } finally { + writer.close(); + } + } + + /** + * Thread that does full scans of the table looking for any partially + * completed rows. + * + * Each iteration of this loads 10 hdfs files, which occupies 5 file open file + * handles. So every 10 iterations (500 file handles) it does a region + * compaction to reduce the number of open file handles. + */ + public static class AtomicHFileLoader extends RepeatingTestThread { + final AtomicLong numBulkLoads = new AtomicLong(); + final AtomicLong numCompactions = new AtomicLong(); + private String tableName; + + public AtomicHFileLoader(String tableName, TestContext ctx, + byte targetFamilies[][]) throws IOException { + super(ctx); + this.tableName = tableName; + } + + public void doAnAction() throws Exception { + long iteration = numBulkLoads.getAndIncrement(); + Path dir = UTIL.getDataTestDir(String.format("bulkLoad_%08d", + iteration)); + + // create HFiles for different column families + FileSystem fs = UTIL.getTestFileSystem(); + byte[] val = Bytes.toBytes(String.format("%010d", iteration)); + final List> famPaths = new ArrayList>( + NUM_CFS); + for (int i = 0; i < NUM_CFS; i++) { + Path hfile = new Path(dir, family(i)); + byte[] fam = Bytes.toBytes(family(i)); + createHFile(fs, hfile, fam, QUAL, val, 1000); + famPaths.add(new Pair(fam, hfile.toString())); + } + + // bulk load HFiles + HConnection conn = UTIL.getHBaseAdmin().getConnection(); + byte[] tbl = Bytes.toBytes(tableName); + conn.getRegionServerWithRetries(new ServerCallable(conn, tbl, Bytes + .toBytes("aaa")) { + @Override + public Void call() throws Exception { + LOG.debug("Going to connect to server " + location + " for row " + + Bytes.toStringBinary(row)); + byte[] regionName = location.getRegionInfo().getRegionName(); + server.bulkLoadHFiles(famPaths, regionName); + return null; + } + }); + + // Periodically do compaction to reduce the number of open file handles. + if (numBulkLoads.get() % 10 == 0) { + // 10 * 50 = 500 open file handles! + conn.getRegionServerWithRetries(new ServerCallable(conn, tbl, + Bytes.toBytes("aaa")) { + @Override + public Void call() throws Exception { + LOG.debug("compacting " + location + " for row " + + Bytes.toStringBinary(row)); + server.compactRegion(location.getRegionInfo(), true); + numCompactions.incrementAndGet(); + return null; + } + }); + } + } + } + + /** + * Thread that does full scans of the table looking for any partially + * completed rows. + */ + public static class AtomicScanReader extends RepeatingTestThread { + byte targetFamilies[][]; + HTable table; + AtomicLong numScans = new AtomicLong(); + AtomicLong numRowsScanned = new AtomicLong(); + String TABLE_NAME; + + public AtomicScanReader(String TABLE_NAME, TestContext ctx, + byte targetFamilies[][]) throws IOException { + super(ctx); + this.TABLE_NAME = TABLE_NAME; + this.targetFamilies = targetFamilies; + table = new HTable(conf, TABLE_NAME); + } + + public void doAnAction() throws Exception { + Scan s = new Scan(); + for (byte[] family : targetFamilies) { + s.addFamily(family); + } + ResultScanner scanner = table.getScanner(s); + + for (Result res : scanner) { + byte[] lastRow = null, lastFam = null, lastQual = null; + byte[] gotValue = null; + for (byte[] family : targetFamilies) { + byte qualifier[] = QUAL; + byte thisValue[] = res.getValue(family, qualifier); + if (gotValue != null && thisValue != null + && !Bytes.equals(gotValue, thisValue)) { + + StringBuilder msg = new StringBuilder(); + msg.append("Failed on scan ").append(numScans) + .append(" after scanning ").append(numRowsScanned) + .append(" rows!\n"); + msg.append("Current was " + Bytes.toString(res.getRow()) + "/" + + Bytes.toString(family) + ":" + Bytes.toString(qualifier) + + " = " + Bytes.toString(thisValue) + "\n"); + msg.append("Previous was " + Bytes.toString(lastRow) + "/" + + Bytes.toString(lastFam) + ":" + Bytes.toString(lastQual) + + " = " + Bytes.toString(gotValue)); + throw new RuntimeException(msg.toString()); + } + + lastFam = family; + lastQual = qualifier; + lastRow = res.getRow(); + gotValue = thisValue; + } + numRowsScanned.getAndIncrement(); + } + numScans.getAndIncrement(); + } + } + + /** + * Creates a table with given table name and specified number of column + * families if the table does not already exist. + */ + private void setupTable(String table, int cfs) throws IOException { + try { + LOG.info("Creating table " + table); + HTableDescriptor htd = new HTableDescriptor(table); + for (int i = 0; i < 10; i++) { + htd.addFamily(new HColumnDescriptor(family(i))); + } + + HBaseAdmin admin = UTIL.getHBaseAdmin(); + admin.createTable(htd); + } catch (TableExistsException tee) { + LOG.info("Table " + table + " already exists"); + } + } + + /** + * Atomic bulk load. + */ + @Test + public void testAtomicBulkLoad() throws Exception { + String TABLE_NAME = "atomicBulkLoad"; + + int millisToRun = 30000; + int numScanners = 50; + + UTIL.startMiniCluster(1); + try { + runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners); + } finally { + UTIL.shutdownMiniCluster(); + } + } + + void runAtomicBulkloadTest(String tableName, int millisToRun, int numScanners) + throws Exception { + setupTable(tableName, 10); + + TestContext ctx = new TestContext(UTIL.getConfiguration()); + + AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null); + ctx.addThread(loader); + + List scanners = Lists.newArrayList(); + for (int i = 0; i < numScanners; i++) { + AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families); + scanners.add(scanner); + ctx.addThread(scanner); + } + + ctx.startThreads(); + ctx.waitFor(millisToRun); + ctx.stop(); + + LOG.info("Loaders:"); + LOG.info(" loaded " + loader.numBulkLoads.get()); + LOG.info(" compations " + loader.numCompactions.get()); + + LOG.info("Scanners:"); + for (AtomicScanReader scanner : scanners) { + LOG.info(" scanned " + scanner.numScans.get()); + LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); + } + } + + /** + * Run test on an HBase instance for 5 minutes. This assumes that the table + * under test only has a single region. + */ + public static void main(String args[]) throws Exception { + try { + Configuration c = HBaseConfiguration.create(); + TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(); + test.setConf(c); + test.runAtomicBulkloadTest("atomicTableTest", 5 * 60 * 1000, 50); + } finally { + System.exit(0); // something hangs (believe it is lru threadpool) + } + } + + private void setConf(Configuration c) { + UTIL = new HBaseTestingUtility(c); + } +} Index: src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (revision 1195063) +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (working copy) @@ -24,6 +24,7 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.FlushRequester; @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -205,7 +206,9 @@ byte [] row = Bytes.toBytes(tableNameStr); writer.append(new KeyValue(row, family, family, row)); writer.close(); - region.bulkLoadHFile(f.toString(), family); + List > hfs= new ArrayList>(1); + hfs.add(Pair.newPair(family, f.toString())); + region.bulkLoadHFiles(hfs); // Add an edit so something in the WAL region.put((new Put(row)).add(family, family, family)); wal.sync(); Index: src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java (revision 0) @@ -0,0 +1,324 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.NavigableMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ServerCallable; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Multimap; + +/** + * Test cases for the atomic load error handling of the bulk load functionality. + */ +public class TestLoadIncrementalHFilesSplitRecovery { + final static Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class); + + private static HBaseTestingUtility util; + + final static int NUM_CFS = 10; + final static byte[] QUAL = Bytes.toBytes("qual"); + final static int ROWCOUNT = 100; + + private final static byte[][] families = new byte[NUM_CFS][]; + static { + for (int i = 0; i < NUM_CFS; i++) { + families[i] = Bytes.toBytes(family(i)); + } + } + + static byte[] rowkey(int i) { + return Bytes.toBytes(String.format("row_%08d", i)); + } + + static String family(int i) { + return String.format("family_%04d", i); + } + + static byte[] value(int i) { + return Bytes.toBytes(String.format("%010d", i)); + } + + public static void buildHFiles(FileSystem fs, Path dir, int value) + throws IOException { + byte[] val = value(value); + for (int i = 0; i < NUM_CFS; i++) { + Path testIn = new Path(dir, family(i)); + + TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i), + Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT); + } + } + + /** + * Creates a table with given table name and specified number of column + * families if the table does not already exist. + */ + private void setupTable(String table, int cfs) throws IOException { + try { + LOG.info("Creating table " + table); + HTableDescriptor htd = new HTableDescriptor(table); + for (int i = 0; i < 10; i++) { + htd.addFamily(new HColumnDescriptor(family(i))); + } + + HBaseAdmin admin = util.getHBaseAdmin(); + admin.createTable(htd); + } catch (TableExistsException tee) { + LOG.info("Table " + table + " already exists"); + } + } + + @BeforeClass + public static void setupCluster() throws Exception { + util = new HBaseTestingUtility(); + util.startMiniCluster(1); + } + + @AfterClass + public static void teardownCluster() throws Exception { + util.shutdownMiniCluster(); + } + + void assertExpectedTable(String table, int count, int value) { + try { + assertEquals(util.getHBaseAdmin().listTables(table).length, 1); + + HTable t = new HTable(util.getConfiguration(), table); + Scan s = new Scan(); + ResultScanner sr = t.getScanner(s); + int i = 0; + for (Result r : sr) { + i++; + for (NavigableMap nm : r.getNoVersionMap().values()) { + for (byte[] val : nm.values()) { + assertTrue(Bytes.equals(val, value(value))); + } + } + + } + assertEquals(count, i); + + } catch (IOException e) { + fail("Failed due to exception"); + } + } + + @Test + public void testBulkLoadPhaseRecovery() throws Exception { + String table = "bulkPhaseRetry"; + setupTable(table, 10); + + final AtomicInteger attmptedCalls = new AtomicInteger(); + final AtomicInteger failedCalls = new AtomicInteger(); + LoadIncrementalHFiles lih = new LoadIncrementalHFiles( + util.getConfiguration()) { + + protected List tryAtomicRegionLoad(final HConnection conn, + byte[] tableName, final byte[] first, Collection lqis) { + int i = attmptedCalls.incrementAndGet(); + if (i == 1) { + HConnection errConn = mock(HConnection.class); + try { + doThrow(new IOException("injecting bulk load error")).when(errConn) + .getRegionServerWithRetries((ServerCallable) anyObject()); + } catch (Exception e) { + LOG.fatal("mocking cruft, should never happen", e); + throw new RuntimeException("mocking cruft, should never happen"); + } + failedCalls.incrementAndGet(); + return super.tryAtomicRegionLoad(errConn, tableName, first, lqis); + } + + return super.tryAtomicRegionLoad(conn, tableName, first, lqis); + } + }; + + // create HFiles for different column families + FileSystem fs = util.getTestFileSystem(); + Path dir = util.getDataTestDir(table); + buildHFiles(fs, dir, 1); + HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table)); + lih.doBulkLoad(dir, t); + + // check that data was loaded + assertEquals(attmptedCalls.get(), 2); + assertEquals(failedCalls.get(), 1); + assertExpectedTable(table, ROWCOUNT, 1); + } + + /** + * This test exercises the path where there is a split after initial + * validation but before the atomic bulk load call. We cannot use presplitting + * to test this path, so we actually inject a split just before the atomic + * region load. + */ + @Test + public void testSplitWhileBulkLoadPhase() throws Exception { + final String table = "bulkPhaseSplit"; + setupTable(table, 10); + LoadIncrementalHFiles lih = new LoadIncrementalHFiles( + util.getConfiguration()); + + + // create HFiles for different column families + Path dir = util.getDataTestDir(table); + Path bulk1 = new Path(dir, "normalBulkload"); + FileSystem fs = util.getTestFileSystem(); + buildHFiles(fs, bulk1, 1); + HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table)); + lih.doBulkLoad(bulk1, t); + assertExpectedTable(table, ROWCOUNT, 1); + + // Now let's cause trouble + final AtomicInteger attmptedCalls = new AtomicInteger(); + LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles( + util.getConfiguration()) { + + protected List tryAtomicRegionLoad(final HConnection conn, + byte[] tableName, final byte[] first, Collection lqis) { + int i = attmptedCalls.incrementAndGet(); + if (i == 1) { + // On first attempt force a split. + try { + // need to call regions server to by synchronous but isn't visible. + + HRegionServer hrs = util.getRSForFirstRegionInTable(Bytes + .toBytes(table)); + + HRegionInfo region = null; + for (HRegionInfo hri : hrs.getOnlineRegions()) { + if (Bytes.equals(hri.getTableName(), Bytes.toBytes(table))) { + // splitRegion doesn't work if startkey/endkey are null + hrs.splitRegion(hri, rowkey(ROWCOUNT / 2)); // hard code split + } + } + + int regions; + do { + regions = 0; + for (HRegionInfo hri : hrs.getOnlineRegions()) { + if (Bytes.equals(hri.getTableName(), Bytes.toBytes(table))) { + regions++; + } + } + if (regions != 2) { + LOG.info("Taking some time to complete split..."); + Thread.sleep(250); + } + } while (regions != 2); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + return super.tryAtomicRegionLoad(conn, tableName, first, lqis); + } + }; + + // create HFiles for different column families + Path bulk2 = new Path(dir, "bulkload2"); + buildHFiles(fs, bulk2, 2); // all values are '2' + lih2.doBulkLoad(bulk2, t); + + // check that data was loaded + + // The three expected attempts are 1) failure because need to split, 2) + // load of split top 3) load of split bottom + assertEquals(attmptedCalls.get(), 3); + assertExpectedTable(table, ROWCOUNT, 2); + } + + @Test(expected = IOException.class) + public void testGroupOrSplitFailure() throws Exception { + String table = "groupOrSplitStoreFail"; + setupTable(table, 10); + + try { + LoadIncrementalHFiles lih = new LoadIncrementalHFiles( + util.getConfiguration()) { + int i = 0; + + protected List groupOrSplit( + Multimap regionGroups, + final LoadQueueItem item, final HTable table, + final Pair startEndKeys) throws IOException { + i++; + + if (i == 5) { + throw new IOException("failure"); + } + return super.groupOrSplit(regionGroups, item, table, startEndKeys); + } + }; + + Path dir = util.getDataTestDir(table); + + // create HFiles for different column families + FileSystem fs = util.getTestFileSystem(); + buildHFiles(fs, dir, 1); + HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table)); + lih.doBulkLoad(dir, t); + + // check that data was loaded + assertExpectedTable(table, ROWCOUNT, 1); + + } finally { + util.shutdownMiniCluster(); + } + } +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1195063) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -105,6 +105,7 @@ import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.StringUtils; import org.cliffc.high_scale_lib.Counter; @@ -2781,24 +2782,90 @@ return lid; } - public void bulkLoadHFile(String hfilePath, byte[] familyName) + /** + * Attempts to atomically load a group of hfiles. This is critical for loading + * rows with multiple column families atomically. + * + * @param familyPaths List of Pair + */ + public void bulkLoadHFiles(List> familyPaths) throws IOException { - startRegionOperation(); + startBulkRegionOperation(); this.writeRequestsCount.increment(); + List ioes = new ArrayList(); + List> failures = new ArrayList>(); + boolean rangesOk = true; try { - Store store = getStore(familyName); - if (store == null) { - throw new DoNotRetryIOException( - "No such column family " + Bytes.toStringBinary(familyName)); + // There possibly was a split that happend between when the split keys + // were gathered and before the HReiogn's write lock was taken. We need + // to validate the HFile region before attempting to bulk load all of them + for (Pair p : familyPaths) { + byte[] familyName = p.getFirst(); + String path = p.getSecond(); + + Store store = getStore(familyName); + if (store == null) { + IOException ioe = new DoNotRetryIOException( + "No such column family " + Bytes.toStringBinary(familyName)); + ioes.add(ioe); + failures.add(p); + } + + try { + store.assertBulkLoadHFileOk(new Path(path)); + } catch (IOException ioe) { + rangesOk = false; + ioes.add(ioe); + failures.add(p); + } } - store.bulkLoadHFile(hfilePath); + + if (ioes.size() != 0) { + // validation failed, bail out before doing anything permanent. + return; + } + + for (Pair p : familyPaths) { + byte[] familyName = p.getFirst(); + String path = p.getSecond(); + Store store = getStore(familyName); + try { + store.bulkLoadHFile(path); + } catch (IOException ioe) { + // a failure here causes an atomicity violation that we currently + // cannot recover from since it is likely a failed hdfs operation. + ioes.add(ioe); + failures.add(p); + break; + } + } } finally { - closeRegionOperation(); + closeBulkRegionOperation(); + if (ioes.size() != 0) { + StringBuilder list = new StringBuilder(); + for (Pair p : failures) { + list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ") + .append(p.getSecond()); + } + + if (rangesOk) { + // TODO Need a better story for reverting partial failures due to HDFS. + LOG.error("There was a partial failure due to IO. These " + + "(family,hfile) pairs were not loaded: " + list); + } else { + // problem when validating + LOG.info("There was a recoverable bulk load failure likely due to a" + + " split. These (family, HFile) pairs were not loaded: " + list); + } + + if (ioes.size() == 1) { + throw ioes.get(0); + } + throw MultipleIOException.createIOException(ioes); + } } - } - @Override public boolean equals(Object o) { if (!(o instanceof HRegion)) { @@ -4380,6 +4447,34 @@ } /** + * This method needs to be called before any public call that reads or + * modifies stores in bulk. It has to be called just before a try. + * #closeBulkRegionOperation needs to be called in the try's finally block + * Acquires a writelock and checks if the region is closing or closed. + * @throws NotServingRegionException when the region is closing or closed + */ + private void startBulkRegionOperation() throws NotServingRegionException { + if (this.closing.get()) { + throw new NotServingRegionException(regionInfo.getRegionNameAsString() + + " is closing"); + } + lock.writeLock().lock(); + if (this.closed.get()) { + lock.writeLock().unlock(); + throw new NotServingRegionException(regionInfo.getRegionNameAsString() + + " is closed"); + } + } + + /** + * Closes the lock. This needs to be called in the finally block corresponding + * to the try block of #startRegionOperation + */ + private void closeBulkRegionOperation(){ + lock.writeLock().unlock(); + } + + /** * A mocked list implementaion - discards all updates. */ private static final List MOCKED_LIST = new AbstractList() { Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1195063) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -345,9 +345,12 @@ return this.storefiles; } - public void bulkLoadHFile(String srcPathStr) throws IOException { - Path srcPath = new Path(srcPathStr); - + /** + * This throws a WrongRegionException if the bulkHFile does not fit in this + * region. + * + */ + void assertBulkLoadHFileOk(Path srcPath) throws IOException { HFile.Reader reader = null; try { LOG.info("Validating hfile at " + srcPath + " for inclusion in " @@ -371,13 +374,22 @@ HRegionInfo hri = region.getRegionInfo(); if (!hri.containsRange(firstKey, lastKey)) { throw new WrongRegionException( - "Bulk load file " + srcPathStr + " does not fit inside region " + "Bulk load file " + srcPath.toString() + " does not fit inside region " + this.region); } } finally { if (reader != null) reader.close(); } + } + /** + * This method should only be called from HRegion. It is assumed that the + * ranges of values in the HFile fit within the stores assigned region. + * (assertBulkLoadHFileOk checks this) + */ + void bulkLoadHFile(String srcPathStr) throws IOException { + Path srcPath = new Path(srcPathStr); + // Move the file if it's on another filesystem FileSystem srcFs = srcPath.getFileSystem(conf); if (!srcFs.equals(fs)) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1195063) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -2434,12 +2434,15 @@ } } + /** + * Atomically bulk load several HFiles into an open region + */ @Override - public void bulkLoadHFile(String hfilePath, byte[] regionName, - byte[] familyName) throws IOException { + public void bulkLoadHFiles(List> familyPaths, + byte[] regionName) throws IOException { checkOpen(); HRegion region = getRegion(regionName); - region.bulkLoadHFile(hfilePath, familyName); + region.bulkLoadHFiles(familyPaths); } Map rowlocks = new ConcurrentHashMap(); Index: src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (revision 1195063) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (working copy) @@ -21,13 +21,16 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Deque; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Callable; @@ -37,7 +40,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -62,9 +64,9 @@ import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.Reference.Range; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; import org.apache.hadoop.hbase.util.Bytes; @@ -72,9 +74,11 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; import com.google.common.util.concurrent.ThreadFactoryBuilder; - /** * Tool to load the output of HFileOutputFormat into an existing table. * @see #usage() @@ -87,8 +91,6 @@ static AtomicLong regionCount = new AtomicLong(0); private HBaseAdmin hbAdmin; private Configuration cfg; - private Set futures = new HashSet(); - private Set futuresForSplittingHFile = new HashSet(); public static String NAME = "completebulkload"; @@ -112,7 +114,7 @@ * region boundary, and each part is added back into the queue. * The import process finishes when the queue is empty. */ - private static class LoadQueueItem { + static class LoadQueueItem { final byte[] family; final Path hfilePath; @@ -120,13 +122,17 @@ this.family = family; this.hfilePath = hfilePath; } + + public String toString() { + return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString(); + } } /** * Walk the given directory for all HFiles, and return a Queue * containing all such files. */ - private Deque discoverLoadQueue(Path hfofDir) + private void discoverLoadQueue(Deque ret, Path hfofDir) throws IOException { FileSystem fs = hfofDir.getFileSystem(getConf()); @@ -140,7 +146,6 @@ throw new FileNotFoundException("No families found in " + hfofDir); } - Deque ret = new LinkedList(); for (FileStatus stat : familyDirStatuses) { if (!stat.isDir()) { LOG.warn("Skipping non-directory " + stat.getPath()); @@ -156,7 +161,6 @@ ret.add(new LoadQueueItem(family, hfile)); } } - return ret; } /** @@ -167,10 +171,10 @@ * @param table the table to load into * @throws TableNotFoundException if table does not yet exist */ - public void doBulkLoad(Path hfofDir, HTable table) + public void doBulkLoad(Path hfofDir, final HTable table) throws TableNotFoundException, IOException { - HConnection conn = table.getConnection(); + final HConnection conn = table.getConnection(); if (!conn.isTableAvailable(table.getTableName())) { throw new TableNotFoundException("Table " + @@ -178,54 +182,51 @@ "is not currently available."); } - Deque queue = null; + // initialize thread pools int nrThreads = cfg.getInt("hbase.loadincremental.threads.max", Runtime.getRuntime().availableProcessors()); ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); builder.setNameFormat("LoadIncrementalHFiles-%1$d"); - ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(), builder.build()); ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true); + + // LQI queue does not need to be threadsafe -- all operations on this queue + // happen in this thread + Deque queue = new LinkedList(); try { - queue = discoverLoadQueue(hfofDir); - // outer loop picks up LoadQueueItem due to HFile split - while (!queue.isEmpty() || futuresForSplittingHFile.size() > 0) { - Pair startEndKeys = table.getStartEndKeys(); - // inner loop groups callables - while (!queue.isEmpty()) { - LoadQueueItem item = queue.remove(); - tryLoad(item, conn, table, queue, startEndKeys, pool); + discoverLoadQueue(queue, hfofDir); + int count = 0; + + // Assumes that region splits can happen while this occurs. + while (!queue.isEmpty()) { + // need to reload split keys each iteration. + final Pair startEndKeys = table.getStartEndKeys(); + if (count != 0) { + LOG.info("Split occured while grouping HFiles, retry attempt " + + + count + " with " + queue.size() + " files remaining to load"); } - Iterator iter = futuresForSplittingHFile.iterator(); - while (iter.hasNext()) { - boolean timeoutSeen = false; - Future future = iter.next(); - try { - future.get(20, TimeUnit.MILLISECONDS); - break; // we have at least two new HFiles to process - } catch (ExecutionException ee) { - LOG.error(ee); - } catch (InterruptedException ie) { - LOG.error(ie); - } catch (TimeoutException te) { - timeoutSeen = true; - } finally { - if (!timeoutSeen) iter.remove(); - } + + int maxRetries = cfg.getInt("hbase.bulkload.retries.number", 10); + if (count >= maxRetries) { + LOG.error("Retry attempted " + count + " times without completing, bailing out"); + return; } + count++; + + // Using ByteBuffer for byte[] equality semantics + Multimap regionGroups = groupOrSplitPhase(table, + pool, queue, startEndKeys); + + bulkLoadPhase(table, conn, pool, queue, regionGroups); + + // NOTE: The next iteration's split / group could happen in parallel to + // atomic bulkloads assuming that there are splits and no merges, and + // that we can atomically pull out the groups we want to retry. } - for (Future future : futures) { - try { - future.get(); - } catch (ExecutionException ee) { - LOG.error(ee); - } catch (InterruptedException ie) { - LOG.error(ie); - } - } + } finally { pool.shutdown(); if (queue != null && !queue.isEmpty()) { @@ -241,15 +242,111 @@ } } + /** + * This takes the LQI's grouped by likely regions and attempts to bulk load + * them. Any failures are re-queued for another pass with the + * groupOrSplitPhase. + */ + private void bulkLoadPhase(final HTable table, final HConnection conn, + ExecutorService pool, Deque queue, + final Multimap regionGroups) throws IOException { + // atomically bulk load the groups. + Set>> loadingFutures = new HashSet>>(); + for (Entry> e: regionGroups.asMap().entrySet()) { + final byte[] first = e.getKey().array(); + final Collection lqis = e.getValue(); + + final Callable> call = new Callable>() { + public List call() throws Exception { + List toRetry = tryAtomicRegionLoad(conn, table.getTableName(), first, lqis); + return toRetry; + } + }; + loadingFutures.add(pool.submit(call)); + } + + // get all the results. + for (Future> future : loadingFutures) { + try { + List toRetry = future.get(); + if (toRetry != null && toRetry.size() != 0) { + // LQIs that are requeued to be regrouped. + queue.addAll(toRetry); + } + + } catch (ExecutionException e1) { + Throwable t = e1.getCause(); + if (t instanceof IOException) { + LOG.error("IOException during bulk load", e1); + throw (IOException)t; // would have been thrown if not parallelized, + } + LOG.error("Unexpected execution exception during bulk load", e1); + throw new IllegalStateException(t); + } catch (InterruptedException e1) { + LOG.error("Unexpected interrupted exception during bulk load", e1); + throw new IllegalStateException(e1); + } + } + } + + /** + * @return A Multimap that groups LQI by likely + * bulk load region targets. + */ + private Multimap groupOrSplitPhase(final HTable table, + ExecutorService pool, Deque queue, + final Pair startEndKeys) throws IOException { + // need synchronized only within this scope of this + // phase because of the puts that happen in futures. + Multimap rgs = HashMultimap.create(); + final Multimap regionGroups = Multimaps.synchronizedMultimap(rgs); + + // drain LQIs and figure out bulk load groups + Set>> splittingFutures = new HashSet>>(); + while (!queue.isEmpty()) { + final LoadQueueItem item = queue.remove(); + + final Callable> call = new Callable>() { + public List call() throws Exception { + List splits = groupOrSplit(regionGroups, item, table, startEndKeys); + return splits; + } + }; + splittingFutures.add(pool.submit(call)); + } + // get all the results. All grouping and splitting must finish before + // we can attempt the atomic loads. + for (Future> lqis : splittingFutures) { + try { + List splits = lqis.get(); + if (splits != null) { + queue.addAll(splits); + } + } catch (ExecutionException e1) { + Throwable t = e1.getCause(); + if (t instanceof IOException) { + LOG.error("IOException during splitting", e1); + throw (IOException)t; // would have been thrown if not parallelized, + } + LOG.error("Unexpected execution exception during splitting", e1); + throw new IllegalStateException(t); + } catch (InterruptedException e1) { + LOG.error("Unexpected interrupted exception during splitting", e1); + throw new IllegalStateException(e1); + } + } + return regionGroups; + } + // unique file name for the table String getUniqueName(byte[] tableName) { String name = Bytes.toStringBinary(tableName) + "," + regionCount.incrementAndGet(); return name; } - void splitStoreFileAndRequeue(final LoadQueueItem item, - final Deque queue, final HTable table, - byte[] startKey, byte[] splitKey) throws IOException { + protected List splitStoreFile(final LoadQueueItem item, + final HTable table, byte[] startKey, + byte[] splitKey) throws IOException { final Path hfilePath = item.hfilePath; // We use a '_' prefix which is ignored when walking directory trees @@ -268,25 +365,26 @@ // Add these back at the *front* of the queue, so there's a lower // chance that the region will just split again before we get there. - synchronized (queue) { - queue.addFirst(new LoadQueueItem(item.family, botOut)); - queue.addFirst(new LoadQueueItem(item.family, topOut)); - } + List lqis = new ArrayList(2); + lqis.add(new LoadQueueItem(item.family, botOut)); + lqis.add(new LoadQueueItem(item.family, topOut)); + LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); + return lqis; } /** - * Attempt to load the given load queue item into its target region server. + * Attempt to assign the given load queue item into its target region group. * If the hfile boundary no longer fits into a region, physically splits - * the hfile such that the new bottom half will fit, and adds the two - * resultant hfiles back into the load queue. + * the hfile such that the new bottom half will fit and returns the list of + * LQI's corresponding to the resultant hfiles. + * + * protected for testing */ - private boolean tryLoad(final LoadQueueItem item, - final HConnection conn, final HTable table, - final Deque queue, - final Pair startEndKeys, - ExecutorService pool) - throws IOException { + protected List groupOrSplit(Multimap regionGroups, + final LoadQueueItem item, final HTable table, + final Pair startEndKeys) + throws IOException { final Path hfilePath = item.hfilePath; final FileSystem fs = hfilePath.getFileSystem(getConf()); HFile.Reader hfr = HFile.createReader(fs, hfilePath, @@ -305,54 +403,80 @@ " last=" + Bytes.toStringBinary(last)); if (first == null || last == null) { assert first == null && last == null; + // TODO what if this is due to a bad HFile? LOG.info("hfile " + hfilePath + " has no entries, skipping"); - return false; + return null; } if (Bytes.compareTo(first, last) > 0) { throw new IllegalArgumentException( "Invalid range: " + Bytes.toStringBinary(first) + " > " + Bytes.toStringBinary(last)); } - int idx = Arrays.binarySearch(startEndKeys.getFirst(), first, Bytes.BYTES_COMPARATOR); + int idx = Arrays.binarySearch(startEndKeys.getFirst(), first, + Bytes.BYTES_COMPARATOR); if (idx < 0) { - idx = -(idx+1)-1; + // not on boundary, returns -(insertion index). Calculate region it + // would be in. + idx = -(idx + 1) - 1; } final int indexForCallable = idx; boolean lastKeyInRange = Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 || Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY); if (!lastKeyInRange) { - Callable callable = new Callable() { - public Void call() throws Exception { - splitStoreFileAndRequeue(item, queue, table, - startEndKeys.getFirst()[indexForCallable], - startEndKeys.getSecond()[indexForCallable]); - return (Void)null; - } - }; - futuresForSplittingHFile.add(pool.submit(callable)); + List lqis = splitStoreFile(item, table, + startEndKeys.getFirst()[indexForCallable], + startEndKeys.getSecond()[indexForCallable]); + return lqis; + } - return true; + // group regions. + regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item); + return null; + } + + /** + * Attempts to do an atomic load of many hfiles into a region. If it fails, + * it returns a list of hfiles that need to be retried. If it is successful + * it will return an empty list. + * + * NOTE: To maintain row atomicity guarantees, region server callable should + * succeed atomically and fails atomically. + * + * Protected for testing. + */ + protected List tryAtomicRegionLoad(final HConnection conn, + byte[] tableName, final byte[] first, Collection lqis) { + + final List> famPaths = + new ArrayList>(lqis.size()); + for (LoadQueueItem lqi : lqis) { + famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); } - final ServerCallable svrCallable = new ServerCallable(conn, table.getTableName(), first) { + final ServerCallable svrCallable = new ServerCallable(conn, + tableName, first) { @Override public Void call() throws Exception { - LOG.debug("Going to connect to server " + location + - "for row " + Bytes.toStringBinary(row)); - + LOG.debug("Going to connect to server " + location + " for row " + + Bytes.toStringBinary(row)); byte[] regionName = location.getRegionInfo().getRegionName(); - server.bulkLoadHFile(hfilePath.toString(), regionName, item.family); + server.bulkLoadHFiles(famPaths, regionName); return null; } }; - Callable callable = new Callable() { - public Void call() throws Exception { - return conn.getRegionServerWithRetries(svrCallable); - } - }; - futures.add(pool.submit(callable)); - return false; + + List toRetry = new ArrayList(); + try { + conn.getRegionServerWithRetries(svrCallable); + } catch (IOException e) { + LOG.warn("Attempt to bulk load region containing " + + Bytes.toStringBinary(first) + " into table " + + Bytes.toStringBinary(tableName) + " with files " + lqis + + " failed"); + toRetry.addAll(lqis); + } + return toRetry; } /** @@ -559,7 +683,8 @@ } public static void main(String[] args) throws Exception { - ToolRunner.run(new LoadIncrementalHFiles(HBaseConfiguration.create()), args); + int ret = ToolRunner.run(new LoadIncrementalHFiles(HBaseConfiguration.create()), args); + System.exit(ret); } } Index: src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (revision 1195063) +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (working copy) @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.hbase.ipc.VersionedProtocol; @@ -62,7 +63,7 @@ // maintained a single global version number on all HBase Interfaces. This // meant all HBase RPC was broke though only one of the three RPC Interfaces // had changed. This has since been undone. - public static final long VERSION = 28L; + public static final long VERSION = 29L; /** * Get metainfo about an HRegion @@ -323,9 +324,13 @@ public MultiResponse multi(MultiAction multi) throws IOException; /** - * Bulk load an HFile into an open region + * Atomically bulk load multiple HFiles (say from different column families) + * into an open region. + * + * @param familyPaths List of (family, hfile path) pairs + * @param regionName name of region to load hfiles into */ - public void bulkLoadHFile(String hfilePath, byte[] regionName, byte[] familyName) + public void bulkLoadHFiles(List> familyPaths, byte[] regionName) throws IOException; // Master methods Index: src/main/resources/hbase-default.xml =================================================================== --- src/main/resources/hbase-default.xml (revision 1195063) +++ src/main/resources/hbase-default.xml (working copy) @@ -129,6 +129,14 @@ server, getting a cell's value, starting a row update, etc. Default: 10. + + + hbase.bulkload.retries.number + 10 + Maximum retries. This is maximum number of iterations + to atomic bulk loads are attempted in the face of splitting operations + Default: 10. + hbase.client.scanner.caching