diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java index 16a56b8..75b2312 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java @@ -149,7 +149,7 @@ public abstract class TableLockManager { * A null implementation */ @InterfaceAudience.Private - static class NullTableLockManager extends TableLockManager { + public static class NullTableLockManager extends TableLockManager { static class NullTableLock implements TableLock { @Override public void acquire() throws IOException { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 0f7a2d6..371d7ce 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -115,6 +115,7 @@ import org.apache.hadoop.hbase.ipc.RpcClientEngine; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.exceptions.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -437,6 +438,9 @@ public class HRegionServer implements ClientProtocol, /** Handle all the snapshot requests to this server */ RegionServerSnapshotManager snapshotManager; + // Table level lock manager for locking for region operations + private TableLockManager tableLockManager; + /** * Starts a HRegionServer at the default location * @@ -634,6 +638,8 @@ public class HRegionServer implements ClientProtocol, } catch (KeeperException e) { this.abort("Failed to reach zk cluster when creating snapshot handler."); } + this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper, + new ServerName(isa.getHostName(), isa.getPort(), startcode)); } /** @@ -1130,6 +1136,11 @@ public class HRegionServer implements ClientProtocol, return regionServerAccounting; } + @Override + public TableLockManager getTableLockManager() { + return tableLockManager; + } + /* * @param r Region to get RegionLoad for. * diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index 6dc6ae0..765745d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.zookeeper.KeeperException; @@ -60,6 +61,11 @@ public interface RegionServerServices extends OnlineRegions { public RegionServerAccounting getRegionServerAccounting(); /** + * @return RegionServer's instance of {@link TableLockManager} + */ + public TableLockManager getTableLockManager(); + + /** * Tasks to perform after region open to complete deploy of region on * regionserver * diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java index caa617e..2d757f6 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.StringUtils; @@ -38,6 +39,7 @@ class SplitRequest implements Runnable { private final HRegion parent; private final byte[] midKey; private final HRegionServer server; + private TableLock tableLock; SplitRequest(HRegion region, byte[] midKey, HRegionServer hrs) { Preconditions.checkNotNull(hrs); @@ -61,6 +63,18 @@ class SplitRequest implements Runnable { try { final long startTime = System.currentTimeMillis(); SplitTransaction st = new SplitTransaction(parent, midKey); + + //acquire a shared read lock on the table, so that table schema modifications + //do not happen concurrently + tableLock = server.getTableLockManager().readLock(parent.getTableDesc().getName() + , "SPLIT_REGION:" + parent.getRegionNameAsString()); + try { + tableLock.acquire(); + } catch (IOException ex) { + tableLock = null; + throw ex; + } + // If prepare does not return true, for some reason -- logged inside in // the prepare call -- we are not ready to split just now. Just return. if (!st.prepare()) return; @@ -109,6 +123,18 @@ class SplitRequest implements Runnable { RemoteExceptionHandler.checkIOException(io)); } } + releaseTableLock(); + } + } + + protected void releaseTableLock() { + if (this.tableLock != null) { + try { + this.tableLock.release(); + } catch (IOException ex) { + LOG.warn("Could not release the table lock", ex); + //TODO: if we get here, and not abort RS, this lock will never be released + } } } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index ee3dbd2..0c7e519 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse; @@ -148,7 +149,7 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer /** * @param sn Name of this mock regionserver - * @throws IOException + * @throws IOException * @throws org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException */ MockRegionServer(final Configuration conf, final ServerName sn) @@ -290,6 +291,10 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer return null; } + public TableLockManager getTableLockManager() { + return new NullTableLockManager(); + } + @Override public void postOpenDeployTasks(HRegion r, CatalogTracker ct) throws KeeperException, IOException { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java index 091cb61..8b1e192 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java @@ -21,9 +21,12 @@ package org.apache.hadoop.hbase.master; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import java.io.IOException; +import java.util.List; +import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -34,10 +37,12 @@ import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.exceptions.LockTimeoutException; import org.apache.hadoop.hbase.exceptions.TableNotDisabledException; @@ -45,7 +50,10 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver; import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.LoadTestTool; +import org.apache.hadoop.hbase.util.StoppableImplementation; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -56,7 +64,7 @@ import org.junit.experimental.categories.Category; /** * Tests the default table lock manager */ -@Category(MediumTests.class) +@Category(LargeTests.class) public class TestTableLockManager { private static final Log LOG = @@ -291,4 +299,103 @@ public class TestTableLockManager { executor.shutdownNow(); } + @Test(timeout = 600000) + public void testTableReadLock() throws Exception { + // test plan: write some data to the table. Continuously alter the table and + // force splits + // concurrently until we have 10 regions. verify the data just in case. + // Every region should contain the same table descriptor + // This is not an exact test + prepareMiniCluster(); + LoadTestTool loadTool = new LoadTestTool(); + loadTool.setConf(TEST_UTIL.getConfiguration()); + int numKeys = 10000; + final byte[] tableName = Bytes.toBytes("testTableReadLock"); + final HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + final HTableDescriptor desc = new HTableDescriptor(tableName); + final byte[] family = Bytes.toBytes("test_cf"); + desc.addFamily(new HColumnDescriptor(family)); + admin.createTable(desc); // create with one region + + // write some data, not much + int ret = loadTool.run(new String[] { "-tn", Bytes.toString(tableName), "-write", + String.format("%d:%d:%d", 1, 10, 10), "-num_keys", String.valueOf(numKeys), "-skip_init" }); + if (0 != ret) { + String errorMsg = "Load failed with error code " + ret; + LOG.error(errorMsg); + fail(errorMsg); + } + + int familyValues = admin.getTableDescriptor(tableName).getFamily(family).getValues().size(); + StoppableImplementation stopper = new StoppableImplementation(); + + //alter table every 10 sec + Chore alterThread = new Chore("Alter Chore", 10000, stopper) { + @Override + protected void chore() { + Random random = new Random(); + try { + HTableDescriptor htd = admin.getTableDescriptor(tableName); + String val = String.valueOf(random.nextInt()); + htd.getFamily(family).setValue(val, val); + desc.getFamily(family).setValue(val, val); // save it for later + // control + admin.modifyTable(tableName, htd); + } catch (Exception ex) { + LOG.warn("Caught exception", ex); + fail(ex.getMessage()); + } + } + }; + + //split table every 5 sec + Chore splitThread = new Chore("Split thread", 5000, stopper) { + @Override + public void chore() { + try { + Random random = new Random(); + List regions = admin.getTableRegions(tableName); + byte[] regionName = regions.get(random.nextInt(regions.size())).getRegionName(); + admin.flush(regionName); + admin.compact(regionName); + admin.split(regionName); + } catch (Exception ex) { + LOG.warn("Caught exception", ex); + fail(ex.getMessage()); + } + } + }; + + alterThread.start(); + splitThread.start(); + while (true) { + List regions = admin.getTableRegions(tableName); + LOG.info(String.format("Table #regions: %d regions: %s:", regions.size(), regions)); + assertEquals(admin.getTableDescriptor(tableName), desc); + for (HRegion region : TEST_UTIL.getMiniHBaseCluster().getRegions(tableName)) { + assertEquals(desc, region.getTableDesc()); + } + if (regions.size() >= 10) { + break; + } + Threads.sleep(1000); + } + stopper.stop("test finished"); + + int newFamilyValues = admin.getTableDescriptor(tableName).getFamily(family).getValues().size(); + LOG.info(String.format("Altered the table %d times", newFamilyValues - familyValues)); + assertTrue(newFamilyValues > familyValues); // at least one alter went + // through + + ret = loadTool.run(new String[] { "-tn", Bytes.toString(tableName), "-read", "100:10", + "-num_keys", String.valueOf(numKeys), "-skip_init" }); + if (0 != ret) { + String errorMsg = "Verify failed with error code " + ret; + LOG.error(errorMsg); + fail(errorMsg); + } + + admin.close(); + } + } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java index efcbe81..6bebc4a 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager; import org.apache.hadoop.hbase.regionserver.CompactionRequestor; import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -131,6 +133,11 @@ public class MockRegionServerServices implements RegionServerServices { } @Override + public TableLockManager getTableLockManager() { + return new NullTableLockManager(); + } + + @Override public ServerName getServerName() { return this.serverName; }