diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java index 191d35e..c6452af 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java @@ -149,7 +149,7 @@ public abstract class TableLockManager { * A null implementation */ @InterfaceAudience.Private - static class NullTableLockManager extends TableLockManager { + public static class NullTableLockManager extends TableLockManager { static class NullTableLock implements TableLock { @Override public void acquire() throws IOException { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index c8ea3bd..920c26d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -29,7 +29,6 @@ import java.lang.reflect.Method; import java.net.BindException; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -53,7 +52,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.management.ObjectName; -import com.google.protobuf.Message; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -66,9 +64,9 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.FailedSanityCheckException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HealthCheckChore; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HealthCheckChore; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.OutOfOrderScannerNextException; @@ -111,6 +109,7 @@ import org.apache.hadoop.hbase.ipc.RpcClientEngine; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; @@ -214,6 +213,7 @@ import org.cliffc.high_scale_lib.Counter; import com.google.common.base.Function; import com.google.protobuf.ByteString; +import com.google.protobuf.Message; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -427,6 +427,9 @@ public class HRegionServer implements ClientProtocol, private RegionServerCoprocessorHost rsHost; + // Table level lock manager for read locking for region operations + private TableLockManager tableLockManager; + /** * Starts a HRegionServer at the default location * @@ -765,6 +768,8 @@ public class HRegionServer implements ClientProtocol, } catch (KeeperException e) { this.abort("Failed to retrieve Cluster ID",e); } + this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper, + new ServerName(isa.getHostName(), isa.getPort(), startcode)); } /** @@ -1131,8 +1136,8 @@ public class HRegionServer implements ClientProtocol, private void closeWAL(final boolean delete) { try { if (this.hlogForMeta != null) { - //All hlogs (meta and non-meta) are in the same directory. Don't call - //closeAndDelete here since that would delete all hlogs not just the + //All hlogs (meta and non-meta) are in the same directory. Don't call + //closeAndDelete here since that would delete all hlogs not just the //meta ones. We will just 'close' the hlog for meta here, and leave //the directory cleanup to the follow-on closeAndDelete call. this.hlogForMeta.close(); @@ -1242,6 +1247,11 @@ public class HRegionServer implements ClientProtocol, return regionServerAccounting; } + @Override + public TableLockManager getTableLockManager() { + return tableLockManager; + } + /* * @param r Region to get RegionLoad for. * @@ -1415,8 +1425,8 @@ public class HRegionServer implements ClientProtocol, Path logdir = new Path(rootDir, logName); if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir); - this.hlogForMeta = HLogFactory.createMetaHLog(this.fs.getBackingFs(), - rootDir, logName, this.conf, getMetaWALActionListeners(), + this.hlogForMeta = HLogFactory.createMetaHLog(this.fs.getBackingFs(), + rootDir, logName, this.conf, getMetaWALActionListeners(), this.serverNameFromMasterPOV.toString()); } return this.hlogForMeta; @@ -1518,7 +1528,7 @@ public class HRegionServer implements ClientProtocol, ".compactionChecker", uncaughtExceptionHandler); if (this.healthCheckChore != null) { Threads - .setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker", + .setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker", uncaughtExceptionHandler); } @@ -1612,17 +1622,17 @@ public class HRegionServer implements ClientProtocol, return getWAL(null); } catch (IOException e) { LOG.warn("getWAL threw exception " + e); - return null; + return null; } } @Override public HLog getWAL(HRegionInfo regionInfo) throws IOException { //TODO: at some point this should delegate to the HLogFactory - //currently, we don't care about the region as much as we care about the + //currently, we don't care about the region as much as we care about the //table.. (hence checking the tablename below) - //_ROOT_ and .META. regions have separate WAL. - if (regionInfo != null && + //_ROOT_ and .META. regions have separate WAL. + if (regionInfo != null && regionInfo.isMetaTable()) { return getMetaWAL(); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index e40871f..0e064ab 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.zookeeper.KeeperException; @@ -39,7 +40,7 @@ public interface RegionServerServices extends OnlineRegions { */ public boolean isStopping(); - /** @return the HLog for a particular region. Pass null for getting the + /** @return the HLog for a particular region. Pass null for getting the * default (common) WAL */ public HLog getWAL(HRegionInfo regionInfo) throws IOException; @@ -59,9 +60,14 @@ public interface RegionServerServices extends OnlineRegions { public RegionServerAccounting getRegionServerAccounting(); /** + * @return RegionServer's instance of {@link TableLockManager} + */ + public TableLockManager getTableLockManager(); + + /** * Tasks to perform after region open to complete deploy of region on * regionserver - * + * * @param r Region to open. * @param ct Instance of {@link CatalogTracker} * @param daughter True if this is daughter of a split diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java index 0b5a316..836bec2 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.StringUtils; @@ -38,6 +39,7 @@ class SplitRequest implements Runnable { private final HRegion parent; private final byte[] midKey; private final HRegionServer server; + private TableLock tableLock; SplitRequest(HRegion region, byte[] midKey, HRegionServer hrs) { Preconditions.checkNotNull(hrs); @@ -61,6 +63,18 @@ class SplitRequest implements Runnable { try { final long startTime = System.currentTimeMillis(); SplitTransaction st = new SplitTransaction(parent, midKey); + + //acquire a shared read lock on the table, so that table schema modifications + //does not happen concurrently + tableLock = server.getTableLockManager().readLock(parent.getTableDesc().getName() + , "SPLIT_REGION:" + parent.getRegionNameAsString()); + try { + tableLock.acquire(); + } catch(IOException ex) { + tableLock = null; + throw ex; + } + // If prepare does not return true, for some reason -- logged inside in // the prepare call -- we are not ready to split just now. Just return. if (!st.prepare()) return; @@ -110,6 +124,18 @@ class SplitRequest implements Runnable { RemoteExceptionHandler.checkIOException(io)); } } + releaseTableLock(); + } + } + + protected void releaseTableLock() { + if (this.tableLock != null) { + try { + this.tableLock.release(); + } catch (IOException ex) { + LOG.warn("Could not release the table lock", ex); + //TODO: if we get here, and not abort RS, this lock will never be released + } } } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 4f55a3d..e785a0b 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse; @@ -147,8 +148,8 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer /** * @param sn Name of this mock regionserver - * @throws IOException - * @throws ZooKeeperConnectionException + * @throws IOException + * @throws ZooKeeperConnectionException */ MockRegionServer(final Configuration conf, final ServerName sn) throws ZooKeeperConnectionException, IOException { @@ -290,6 +291,11 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer } @Override + public TableLockManager getTableLockManager() { + return new NullTableLockManager(); + } + + @Override public void postOpenDeployTasks(HRegion r, CatalogTracker ct, boolean daughter) throws KeeperException, IOException { // TODO Auto-generated method stub diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java index 3a8c92b..60e7821 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java @@ -21,9 +21,12 @@ package org.apache.hadoop.hbase.master; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import java.io.IOException; +import java.util.List; +import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -34,10 +37,12 @@ import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableLockTimeoutException; import org.apache.hadoop.hbase.TableNotDisabledException; @@ -45,7 +50,10 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver; import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.LoadTestTool; +import org.apache.hadoop.hbase.util.StoppableImplementation; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -56,7 +64,7 @@ import org.junit.experimental.categories.Category; /** * Tests the default table lock manager */ -@Category(MediumTests.class) +@Category(LargeTests.class) public class TestTableLockManager { private static final Log LOG = @@ -291,4 +299,103 @@ public class TestTableLockManager { executor.shutdownNow(); } + @Test(timeout = 600000) + public void testTableReadLock() throws Exception { + // test plan: write some data to the table. Continuously alter the table and + // force splits + // concurrently until we have 10 regions. verify the data just in case. + // Every region should contain the same table descriptor + // This is not an exact test + prepareMiniCluster(); + LoadTestTool loadTool = new LoadTestTool(); + loadTool.setConf(TEST_UTIL.getConfiguration()); + int numKeys = 10000; + final byte[] tableName = Bytes.toBytes("testTableReadLock"); + final HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + final HTableDescriptor desc = new HTableDescriptor(tableName); + final byte[] family = Bytes.toBytes("test_cf"); + desc.addFamily(new HColumnDescriptor(family)); + admin.createTable(desc); // create with one region + + // write some data, not much + int ret = loadTool.run(new String[] { "-tn", Bytes.toString(tableName), "-write", + String.format("%d:%d:%d", 1, 10, 10), "-num_keys", String.valueOf(numKeys), "-skip_init" }); + if (0 != ret) { + String errorMsg = "Load failed with error code " + ret; + LOG.error(errorMsg); + fail(errorMsg); + } + + int familyValues = admin.getTableDescriptor(tableName).getFamily(family).getValues().size(); + StoppableImplementation stopper = new StoppableImplementation(); + + //alter table every 10 sec + Chore alterThread = new Chore("Alter Chore", 10000, stopper) { + @Override + protected void chore() { + Random random = new Random(); + try { + HTableDescriptor htd = admin.getTableDescriptor(tableName); + String val = String.valueOf(random.nextInt()); + htd.getFamily(family).setValue(val, val); + desc.getFamily(family).setValue(val, val); // save it for later + // control + admin.modifyTable(tableName, htd); + } catch (Exception ex) { + LOG.warn("Caught exception", ex); + fail(ex.getMessage()); + } + } + }; + + //split table every 5 sec + Chore splitThread = new Chore("Split thread", 5000, stopper) { + @Override + public void chore() { + try { + Random random = new Random(); + List regions = admin.getTableRegions(tableName); + byte[] regionName = regions.get(random.nextInt(regions.size())).getRegionName(); + admin.flush(regionName); + admin.compact(regionName); + admin.split(regionName); + } catch (Exception ex) { + LOG.warn("Caught exception", ex); + fail(ex.getMessage()); + } + } + }; + + alterThread.start(); + splitThread.start(); + while (true) { + List regions = admin.getTableRegions(tableName); + LOG.info(String.format("Table #regions: %d regions: %s:", regions.size(), regions)); + assertEquals(admin.getTableDescriptor(tableName), desc); + for (HRegion region : TEST_UTIL.getMiniHBaseCluster().getRegions(tableName)) { + assertEquals(desc, region.getTableDesc()); + } + if (regions.size() >= 10) { + break; + } + Threads.sleep(1000); + } + stopper.stop("test finished"); + + int newFamilyValues = admin.getTableDescriptor(tableName).getFamily(family).getValues().size(); + LOG.info(String.format("Altered the table %d times", newFamilyValues - familyValues)); + assertTrue(newFamilyValues > familyValues); // at least one alter went + // through + + ret = loadTool.run(new String[] { "-tn", Bytes.toString(tableName), "-read", "100:10", + "-num_keys", String.valueOf(numKeys), "-skip_init" }); + if (0 != ret) { + String errorMsg = "Verify failed with error code " + ret; + LOG.error(errorMsg); + fail(errorMsg); + } + + admin.close(); + } + } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java index 0db1977..6732197 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java @@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager; import org.apache.hadoop.hbase.regionserver.CompactionRequestor; import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -46,7 +48,7 @@ import org.apache.zookeeper.KeeperException; public class MockRegionServerServices implements RegionServerServices { private final Map regions = new HashMap(); private boolean stopping = false; - private final ConcurrentSkipListMap rit = + private final ConcurrentSkipListMap rit = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); private HFileSystem hfs = null; private ZooKeeperWatcher zkw = null; @@ -118,12 +120,17 @@ public class MockRegionServerServices implements RegionServerServices { public ZooKeeperWatcher getZooKeeper() { return zkw; } - + public RegionServerAccounting getRegionServerAccounting() { return null; } @Override + public TableLockManager getTableLockManager() { + return new NullTableLockManager(); + } + + @Override public ServerName getServerName() { return null; }