diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 1e24b8c..39bb622 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -122,6 +122,20 @@ public final class HConstants { public static final String ZOOKEEPER_SESSION_TIMEOUT = "zookeeper.session.timeout"; + /** Configuration key for enabling table-level locks for schema changes */ + public static final String MASTER_TABLE_LOCK_ENABLE = + "hbase.table.lock.enable"; + + /** by default we should enable table-level locks for schema changes */ + public static final boolean DEFAULT_TABLE_LOCK_ENABLE = true; + + /** Configuration key for time out for trying to acquire table locks */ + public static final String TABLE_LOCK_TIMEOUT_MS = + "hbase.table.lock.timeout.ms"; + + public static final int DEFAULT_TABLE_LOCK_TIMEOUT_MS = + 600 * 1000; //10 min default + /** Name of ZooKeeper quorum configuration parameter. */ public static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; @@ -745,7 +759,7 @@ public final class HConstants { Arrays.asList(new String[] { HREGION_LOGDIR_NAME, HREGION_OLDLOGDIR_NAME, CORRUPT_DIR_NAME, toString(META_TABLE_NAME), toString(ROOT_TABLE_NAME), SPLIT_LOGDIR_NAME, HBCK_SIDELINEDIR_NAME, HFILE_ARCHIVE_DIRECTORY })); - + private HConstants() { // Can't be instantiated with this ctor. } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/TableLockTimeoutException.java hbase-server/src/main/java/org/apache/hadoop/hbase/TableLockTimeoutException.java new file mode 100644 index 0000000..cd742ad --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/TableLockTimeoutException.java @@ -0,0 +1,37 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +public class TableLockTimeoutException extends IOException { + + private static final long serialVersionUID = -1770764924258999825L; + + /** Default constructor */ + public TableLockTimeoutException() { + super(); + } + + public TableLockTimeoutException(String s) { + super(s); + } + +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 9b5f83f..05c2215 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -40,7 +39,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import com.google.common.collect.LinkedHashMultimap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -85,6 +83,8 @@ import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.data.Stat; +import com.google.common.collect.LinkedHashMultimap; + /** * Manages and performs region assignment. *

@@ -111,6 +111,8 @@ public class AssignmentManager extends ZooKeeperListener { private LoadBalancer balancer; + private final TableLockManager tableLockManager; + final private KeyLocker locker = new KeyLocker(); /** @@ -177,7 +179,8 @@ public class AssignmentManager extends ZooKeeperListener { */ public AssignmentManager(Server server, ServerManager serverManager, CatalogTracker catalogTracker, final LoadBalancer balancer, - final ExecutorService service, MetricsMaster metricsMaster) throws KeeperException, IOException { + final ExecutorService service, MetricsMaster metricsMaster, + final TableLockManager tableLockManager) throws KeeperException, IOException { super(server.getZooKeeper()); this.server = server; this.serverManager = serverManager; @@ -195,6 +198,7 @@ public class AssignmentManager extends ZooKeeperListener { Threads.setDaemonThreadRunning(timerUpdater.getThread(), server.getServerName() + ".timerUpdater"); this.zkTable = new ZKTable(this.watcher); + this.tableLockManager = tableLockManager; this.maximumAttempts = this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10); this.balancer = balancer; @@ -2316,8 +2320,8 @@ public class AssignmentManager extends ZooKeeperListener { LOG.info("The table " + tableName + " is in DISABLING state. Hence recovering by moving the table" + " to DISABLED state."); - new DisableTableHandler(this.server, tableName.getBytes(), - catalogTracker, this, true).process(); + new DisableTableHandler(this.server, tableName.getBytes(), catalogTracker, + this, tableLockManager, true).process(); } } } @@ -2342,7 +2346,7 @@ public class AssignmentManager extends ZooKeeperListener { // enableTable in sync way during master startup, // no need to invoke coprocessor new EnableTableHandler(this.server, tableName.getBytes(), - catalogTracker, this, true).process(); + catalogTracker, this, tableLockManager, true).process(); } } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index d5f1f63..f45bc30 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -40,11 +40,6 @@ import java.util.concurrent.atomic.AtomicReference; import javax.management.ObjectName; -import com.google.common.collect.Maps; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.Service; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -167,12 +162,13 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.trace.SpanReceiverHost; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.InfoServer; @@ -192,10 +188,13 @@ import org.apache.hadoop.metrics.util.MBeanUtil; import org.apache.hadoop.net.DNS; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; -import org.apache.hadoop.hbase.trace.SpanReceiverHost; -import org.apache.hadoop.hbase.util.FSUtils; +import com.google.common.collect.Maps; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; +import com.google.protobuf.Service; import com.google.protobuf.ServiceException; /** @@ -320,6 +319,9 @@ Server { private Map coprocessorServiceHandlers = Maps.newHashMap(); + // Table level lock manager for schema changes + private TableLockManager tableLockManager; + /** * Initializes the HMaster. The steps are as follows: *

@@ -399,6 +401,7 @@ Server { this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true); this.metricsMaster = new MetricsMaster( new MetricsMasterWrapperImpl(this)); + } /** @@ -538,7 +541,8 @@ Server { this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this); this.loadBalancerTracker.start(); this.assignmentManager = new AssignmentManager(this, serverManager, - this.catalogTracker, this.balancer, this.executorService, this.metricsMaster); + this.catalogTracker, this.balancer, this.executorService, this.metricsMaster, + this.tableLockManager); zooKeeper.registerListenerFirst(assignmentManager); this.regionServerTracker = new RegionServerTracker(zooKeeper, this, @@ -673,6 +677,11 @@ Server { startServiceThreads(); } + this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper, serverName); + if (!masterRecovery) { + this.tableLockManager.reapAllTableWriteLocks(); + } + // Wait for region servers to report in. this.serverManager.waitForRegionServers(status); // Check zk for region servers that are up but didn't register @@ -1018,6 +1027,11 @@ Server { return this.zooKeeper; } + @Override + public TableLockManager getTableLockManager() { + return this.tableLockManager; + } + /* * Start up all services. If any of these threads gets an unhandled exception * then they just die with a logged message. This should be fine because @@ -1342,6 +1356,14 @@ Server { return balancerRan; } + protected void lockTable(byte[] tableName, String purpose) throws IOException { + tableLockManager.lockTable(tableName, purpose); + } + + protected void unlockTable(byte[] tableName) throws IOException { + tableLockManager.unlockTable(tableName); + } + @Override public BalanceResponse balance(RpcController c, BalanceRequest request) throws ServiceException { try { @@ -1496,11 +1518,10 @@ Server { this.executorService.submit(new CreateTableHandler(this, this.fileSystemManager, hTableDescriptor, conf, - newRegions, catalogTracker, assignmentManager)); + newRegions, this)); if (cpHost != null) { cpHost.postCreateTable(hTableDescriptor, newRegions); } - } private void checkCompression(final HTableDescriptor htd) @@ -1696,7 +1717,7 @@ Server { cpHost.preEnableTable(tableName); } this.executorService.submit(new EnableTableHandler(this, tableName, - catalogTracker, assignmentManager, false)); + catalogTracker, assignmentManager, tableLockManager, false)); if (cpHost != null) { cpHost.postEnableTable(tableName); } @@ -1720,7 +1741,7 @@ Server { cpHost.preDisableTable(tableName); } this.executorService.submit(new DisableTableHandler(this, tableName, - catalogTracker, assignmentManager, false)); + catalogTracker, assignmentManager, tableLockManager, false)); if (cpHost != null) { cpHost.postDisableTable(tableName); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index f9aa860..03d6b75 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; -import com.google.protobuf.Service; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; @@ -30,6 +29,8 @@ import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.executor.ExecutorService; +import com.google.protobuf.Service; + /** * Services Master supplies */ @@ -56,6 +57,11 @@ public interface MasterServices extends Server { public ExecutorService getExecutorService(); /** + * @return Master's instance of {@link TableLockManager} + */ + public TableLockManager getTableLockManager(); + + /** * Check table is modifiable; i.e. exists and is offline. * @param tableName Name of table to check. * @throws TableNotDisabledException diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java new file mode 100644 index 0000000..c7dc21c --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java @@ -0,0 +1,279 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableLockTimeoutException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.util.StringUtils; + +import com.netflix.curator.framework.CuratorFramework; +import com.netflix.curator.framework.recipes.locks.InterProcessLock; +import com.netflix.curator.framework.recipes.locks.InterProcessMutex; +import com.netflix.curator.framework.recipes.locks.InterProcessReadWriteLock; + +/** + * A manager for distributed table level locks. + */ +public abstract class TableLockManager { + + private static final Log LOG = LogFactory.getLog(TableLockManager.class); + + /** + * Lock a table, given a purpose. + * @param tableName Table to lock + * @param purpose Human readable reason for locking the table + * @throws TableLockTimeoutException If unable to acquire a lock within a + * specified time period (if any) + * @throws IOException If unrecoverable ZooKeeper error occurs + */ + public abstract void lockTable(byte[] tableName, String purpose) throws IOException; + + /** + * Lock a table, given a purpose. Does not throw an exception. + * @param tableName Table to lock + * @param purpose Human readable reason for locking the table + * @return whether lockTable was successful + */ + public boolean lockTableFailSilently(byte[] tableName, String purpose) { + try { + lockTable(tableName, purpose); + return true; + } catch (Exception ex) { + LOG.warn("locking table " + tableName + " failed with: " + + StringUtils.stringifyException(ex)); + } + return false; + } + + /** + * UnLock a table + * @param tableName Table to unlock + * @throws IOException If unrecoverable ZooKeeper error occurs + */ + public abstract void unlockTable(byte[] tableName) throws IOException; + + /** + * UnLock a table. Does not throw an exception. + * @param tableName Table to unlock + * @throws IOException If unrecoverable ZooKeeper error occurs + */ + public void unlockTableFailSilently(byte[] tableName) { + try { + unlockTable(tableName); + } catch (Exception ex) { + LOG.warn("unlocking table " + tableName + " failed with: " + + StringUtils.stringifyException(ex)); + } + } + + /** + * Force releases all table write locks even if this thread does not + * own the lock. The behavior of the lock holders still thinking that they + * have the lock is undefined. This should be used carefully and only when + * we can ensure that all write-lock holders have died. For example if only + * the master can hold write locks, then we can reap it's locks when the backup + * master starts. + */ + public abstract void reapAllTableWriteLocks() throws IOException; + + /** + * Creates and returns a TableLockManager according to the configuration + */ + public static TableLockManager createTableLockManager(Configuration conf, + ZooKeeperWatcher zkWatcher, ServerName serverName) { + // Initialize table level lock manager for schema changes, if enabled. + if (conf.getBoolean(HConstants.MASTER_TABLE_LOCK_ENABLE, + HConstants.DEFAULT_TABLE_LOCK_ENABLE)) { + int lockTimeoutMs = conf.getInt(HConstants.TABLE_LOCK_TIMEOUT_MS, + HConstants.DEFAULT_TABLE_LOCK_TIMEOUT_MS); + return new CuratorTableLockManager(zkWatcher, serverName, lockTimeoutMs); + //return new ZKTableLockManager(zkWatcher, serverName, schemaChangeLockTimeoutMs); + } else { + return new NullTableLockManager(); + } + } + + /** + * A null implementation + */ + public static class NullTableLockManager extends TableLockManager { + @Override + public void lockTable(byte[] tableName, String purpose) throws IOException { + } + @Override + public void unlockTable(byte[] tableName) throws IOException { + } + @Override + public void reapAllTableWriteLocks() throws IOException { + } + } + + private static class CuratorTableLockManager extends TableLockManager { + + private final ServerName serverName; + private final ZooKeeperWatcher zkWatcher; + private final long lockTimeoutMs; + + // IPLocks are reentrant in the same thread. We are caching one lock per table. + private final ConcurrentMap locks; + + /** + * Copied from InterProcessReadWriteLock.java + */ + private static final String WRITE_LOCK_NAME = "__WRIT__"; + + public CuratorTableLockManager(ZooKeeperWatcher zkWatcher, ServerName serverName, long lockTimeoutMs) { + locks = new ConcurrentHashMap(); + this.zkWatcher = zkWatcher; + this.serverName = serverName; + this.lockTimeoutMs = lockTimeoutMs; + } + + /** + * Lock a table, given a purpose. + * @param tableName Table to lock + * @param purpose Human readable reason for locking the table + * @throws TableLockTimeoutException If unable to acquire a lock within a + * specified time period (if any) + * @throws IOException If unrecoverable ZooKeeper error occurs + */ + public void lockTable(byte[] tableName, String purpose) throws IOException { + String tableNameStr = Bytes.toString(tableName); + if (LOG.isDebugEnabled()) { + LOG.debug("Attempt to acquire table lock on :" + tableNameStr + " for:" + purpose); + } + InterProcessReadWriteLock lock = createOrGetTableLock(tableNameStr, purpose); + + InterProcessLock writeLock = lock.writeLock(); + + try { + if (lockTimeoutMs < 0) { + writeLock.acquire(); + } else { + if (!writeLock.acquire(lockTimeoutMs, TimeUnit.MILLISECONDS)) { + throw new TableLockTimeoutException("Timed out acquiring " + + "lock for " + tableNameStr + " after " + lockTimeoutMs + " ms."); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Acquired table lock on :" + tableNameStr + " for:" + purpose); + } + } catch (InterruptedException ex) { + LOG.warn("Interrupted acquiring a lock for " + tableNameStr, ex); + Thread.currentThread().interrupt(); + throw new InterruptedIOException("Interrupted acquiring a lock"); + } catch (IOException ex) { + throw ex; + } catch (Exception ex) { + throw new IOException(ex); + } + } + + /** Returns a reentrant lock or creates one for this process */ + private InterProcessReadWriteLock createOrGetTableLock(String tableName, String purpose) { + InterProcessReadWriteLock lock = locks.get(tableName); + if (lock != null) { + return lock; + } + + String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName); + + lock = new InterProcessReadWriteLock(getCuratorClient(), tableLockZNode); + InterProcessReadWriteLock returned = locks.putIfAbsent(tableName, lock); + return returned == null ? lock : returned; + } + + private CuratorFramework getCuratorClient() { + return zkWatcher.getRecoverableZooKeeper().getCuratorClient(); + } + + public void unlockTable(byte[] tableName) throws IOException { + String tableNameStr = Bytes.toString(tableName); + if (LOG.isDebugEnabled()) { + LOG.debug("Attempt to release table lock on :" + tableNameStr); + } + + InterProcessReadWriteLock lock = createOrGetTableLock(tableNameStr, "unlock"); + InterProcessLock writeLock = lock.writeLock(); + + try { + writeLock.release(); + if (LOG.isDebugEnabled()) { + LOG.debug("Relased table lock on : " + tableNameStr); + } + if (!writeLock.isAcquiredInThisProcess()) { + locks.remove(writeLock); //TODO: race condition + } + } catch (InterruptedException ex) { + LOG.warn("Interrupted while releasing a lock for " + tableNameStr, ex); + Thread.currentThread().interrupt(); + throw new InterruptedIOException(); + } catch (IOException ex) { + throw ex; + } catch (Exception ex) { + throw new IOException(ex); + } + } + + @Override + public void reapAllTableWriteLocks() throws IOException { + //get the table names + try { + CuratorFramework curatorClient = getCuratorClient(); + List tableNames = curatorClient.getChildren().forPath(zkWatcher.tableLockZNode); + + for (String tableName : tableNames) { + String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName); + InterProcessReadWriteLock lock + = new InterProcessReadWriteLock(curatorClient, tableLockZNode); + InterProcessMutex writeLock = lock.writeLock(); + Collection znodes = writeLock.getParticipantNodes(); + for (String znode : znodes) { + if (znode.contains(WRITE_LOCK_NAME)) { + LOG.info("Reaping table write lock for table:" + tableName + + " znode:" + znode); + curatorClient.delete().guaranteed().forPath(znode); + } + } + } + } catch (IOException ex) { + throw ex; + } catch (Exception ex) { + LOG.warn("Caught exception while reaping table write locks", ex); + } + } + } + +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java index 0130f51..e766d75 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java @@ -49,6 +49,8 @@ import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.Threads; @@ -65,11 +67,12 @@ public class CreateTableHandler extends EventHandler { private Configuration conf; private final AssignmentManager assignmentManager; private final CatalogTracker catalogTracker; + private final TableLockManager tableLockManager; private final HRegionInfo [] newRegions; public CreateTableHandler(Server server, MasterFileSystem fileSystemManager, HTableDescriptor hTableDescriptor, Configuration conf, HRegionInfo [] newRegions, - CatalogTracker catalogTracker, AssignmentManager assignmentManager) + MasterServices masterServices) throws NotAllMetaRegionsOnlineException, TableExistsException, IOException { super(server, EventType.C_M_CREATE_TABLE); @@ -77,8 +80,9 @@ public class CreateTableHandler extends EventHandler { this.hTableDescriptor = hTableDescriptor; this.conf = conf; this.newRegions = newRegions; - this.catalogTracker = catalogTracker; - this.assignmentManager = assignmentManager; + this.catalogTracker = masterServices.getCatalogTracker(); + this.assignmentManager = masterServices.getAssignmentManager(); + this.tableLockManager = masterServices.getTableLockManager(); int timeout = conf.getInt("hbase.client.catalog.timeout", 10000); // Need META availability to create a table @@ -126,8 +130,14 @@ public class CreateTableHandler extends EventHandler { @Override public void process() { String tableName = this.hTableDescriptor.getNameAsString(); + LOG.info("Attempting to create the table " + tableName); + + //acquire table lock before calling the coprocessors + + if (!tableLockManager.lockTableFailSilently(hTableDescriptor.getName(), "create")) { + return; + } try { - LOG.info("Attempting to create the table " + tableName); MasterCoprocessorHost cpHost = ((HMaster) this.server).getCoprocessorHost(); if (cpHost != null) { cpHost.preCreateTableHandler(this.hTableDescriptor, this.newRegions); @@ -140,6 +150,8 @@ public class CreateTableHandler extends EventHandler { LOG.error("Error trying to create the table " + tableName, e); } catch (KeeperException e) { LOG.error("Error trying to create the table " + tableName, e); + } finally { + tableLockManager.unlockTableFailSilently(hTableDescriptor.getName()); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java index 3a6b629..66a3c4c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.master.BulkAssigner; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.RegionStates; +import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.util.Bytes; import org.apache.zookeeper.KeeperException; import org.cloudera.htrace.Trace; @@ -50,15 +51,17 @@ public class DisableTableHandler extends EventHandler { private final byte [] tableName; private final String tableNameStr; private final AssignmentManager assignmentManager; + private final TableLockManager tableLockManager; public DisableTableHandler(Server server, byte [] tableName, CatalogTracker catalogTracker, AssignmentManager assignmentManager, - boolean skipTableStateCheck) + TableLockManager tableLockManager, boolean skipTableStateCheck) throws TableNotFoundException, TableNotEnabledException, IOException { super(server, EventType.C_M_DISABLE_TABLE); this.tableName = tableName; this.tableNameStr = Bytes.toString(this.tableName); this.assignmentManager = assignmentManager; + this.tableLockManager = tableLockManager; // Check if table exists // TODO: do we want to keep this in-memory as well? i guess this is // part of old master rewrite, schema to zk to check for table @@ -100,6 +103,9 @@ public class DisableTableHandler extends EventHandler { public void process() { try { LOG.info("Attempting to disable table " + this.tableNameStr); + if (!tableLockManager.lockTableFailSilently(tableName, "disable")) { + return; + } MasterCoprocessorHost cpHost = ((HMaster) this.server) .getCoprocessorHost(); if (cpHost != null) { @@ -113,6 +119,8 @@ public class DisableTableHandler extends EventHandler { LOG.error("Error trying to disable table " + this.tableNameStr, e); } catch (KeeperException e) { LOG.error("Error trying to disable table " + this.tableNameStr, e); + } finally { + tableLockManager.unlockTableFailSilently(tableName); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java index f46c870..b825535 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.zookeeper.KeeperException; @@ -55,18 +56,20 @@ public class EnableTableHandler extends EventHandler { private final byte [] tableName; private final String tableNameStr; private final AssignmentManager assignmentManager; + private final TableLockManager tableLockManager; private final CatalogTracker ct; private boolean retainAssignment = false; public EnableTableHandler(Server server, byte [] tableName, CatalogTracker catalogTracker, AssignmentManager assignmentManager, - boolean skipTableStateCheck) + TableLockManager tableLockManager, boolean skipTableStateCheck) throws TableNotFoundException, TableNotDisabledException, IOException { super(server, EventType.C_M_ENABLE_TABLE); this.tableName = tableName; this.tableNameStr = Bytes.toString(tableName); this.ct = catalogTracker; this.assignmentManager = assignmentManager; + this.tableLockManager = tableLockManager; this.retainAssignment = skipTableStateCheck; // Check if table exists if (!MetaReader.tableExists(catalogTracker, this.tableNameStr)) { @@ -106,6 +109,9 @@ public class EnableTableHandler extends EventHandler { public void process() { try { LOG.info("Attempting to enable the table " + this.tableNameStr); + if (!tableLockManager.lockTableFailSilently(tableName, "enable")) { + return; + } MasterCoprocessorHost cpHost = ((HMaster) this.server) .getCoprocessorHost(); if (cpHost != null) { @@ -121,6 +127,8 @@ public class EnableTableHandler extends EventHandler { LOG.error("Error trying to enable the table " + this.tableNameStr, e); } catch (InterruptedException e) { LOG.error("Error trying to enable the table " + this.tableNameStr, e); + } finally { + tableLockManager.unlockTableFailSilently(tableName); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java index c6f70e9..a2a06e5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java @@ -41,7 +41,7 @@ public class TableDeleteFamilyHandler extends TableEventHandler { public TableDeleteFamilyHandler(byte[] tableName, byte [] familyName, Server server, final MasterServices masterServices) throws IOException { - super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices); + super(EventType.C_M_DELETE_FAMILY, tableName, server, masterServices); HTableDescriptor htd = getTableDescriptor(); this.familyName = hasColumnFamily(htd, familyName); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java index aedfa96..c1badbf 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java @@ -93,6 +93,11 @@ public abstract class TableEventHandler extends EventHandler { try { LOG.info("Handling table operation " + eventType + " on table " + Bytes.toString(tableName)); + + if (!masterServices.getTableLockManager().lockTableFailSilently( + tableName, eventType.toString())) { + return; + } List hris = MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName); @@ -112,6 +117,7 @@ public abstract class TableEventHandler extends EventHandler { } catch (KeeperException e) { LOG.error("Error manipulating table " + Bytes.toString(tableName), e); } finally { + masterServices.getTableLockManager().unlockTableFailSilently(tableName); // notify the waiting thread that we're done persisting the request setPersist(); } @@ -141,7 +147,7 @@ public abstract class TableEventHandler extends EventHandler { reRegions.add(hri); serverToRegions.get(rsLocation).add(hri); } - + LOG.info("Reopening " + reRegions.size() + " regions on " + serverToRegions.size() + " region servers."); this.masterServices.getAssignmentManager().setRegionsToReopen(reRegions); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java hbase-server/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java index 8758f9e..f333a51 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java @@ -18,8 +18,11 @@ */ package org.apache.hadoop.hbase; -import java.util.Set; import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -91,7 +94,7 @@ public abstract class MultithreadedTestUtil { stopped = s; } } - + public void stop() throws Exception { synchronized (this) { stopped = true; @@ -130,7 +133,7 @@ public abstract class MultithreadedTestUtil { this.stopped = true; } } - + /** * A test thread that performs a repeating operation. */ @@ -138,13 +141,48 @@ public abstract class MultithreadedTestUtil { public RepeatingTestThread(TestContext ctx) { super(ctx); } - + public final void doWork() throws Exception { while (ctx.shouldRun() && !stopped) { doAnAction(); } } - + public abstract void doAnAction() throws Exception; } + + /** + * Verify that no assertions have failed inside a future. + * Used for unit tests that spawn threads. E.g., + *

+ * + * List> results = Lists.newArrayList(); + * Future f = executor.submit(new Callable { + * public Void call() { + * assertTrue(someMethod()); + * } + * }); + * results.add(f); + * assertOnFutures(results); + * + * @param threadResults A list of futures + * @param + * @throws InterruptedException If interrupted when waiting for a result + * from one of the futures + * @throws ExecutionException If an exception other than AssertionError + * occurs inside any of the futures + */ + public static void assertOnFutures(List> threadResults) + throws InterruptedException, ExecutionException { + for (Future threadResult : threadResults) { + try { + threadResult.get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof AssertionError) { + throw (AssertionError) e.getCause(); + } + throw e; + } + } + } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java index 8880fa0..9282411 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager; import org.apache.hadoop.hbase.master.balancer.DefaultLoadBalancer; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.handler.EnableTableHandler; @@ -181,7 +182,7 @@ public class TestAssignmentManager { * @throws IOException * @throws KeeperException * @throws InterruptedException - * @throws DeserializationException + * @throws DeserializationException */ @Test(timeout = 5000) public void testBalanceOnMasterFailoverScenarioWithOpenedNode() @@ -339,7 +340,7 @@ public class TestAssignmentManager { * from one server to another mocking regionserver responding over zk. * @throws IOException * @throws KeeperException - * @throws DeserializationException + * @throws DeserializationException */ @Test public void testBalance() @@ -354,7 +355,7 @@ public class TestAssignmentManager { .getConfiguration()); // Create an AM. AssignmentManager am = new AssignmentManager(this.server, - this.serverManager, ct, balancer, executor, null); + this.serverManager, ct, balancer, executor, null, master.getTableLockManager()); am.failoverCleanupDone.set(true); try { // Make sure our new AM gets callbacks; once registered, can't unregister. @@ -435,7 +436,7 @@ public class TestAssignmentManager { * To test closed region handler to remove rit and delete corresponding znode * if region in pending close or closing while processing shutdown of a region * server.(HBASE-5927). - * + * * @throws KeeperException * @throws IOException * @throws ServiceException @@ -446,12 +447,12 @@ public class TestAssignmentManager { testCaseWithPartiallyDisabledState(Table.State.DISABLING); testCaseWithPartiallyDisabledState(Table.State.DISABLED); } - - + + /** * To test if the split region is removed from RIT if the region was in SPLITTING state but the RS * has actually completed the splitting in META but went down. See HBASE-6070 and also HBASE-5806 - * + * * @throws KeeperException * @throws IOException */ @@ -462,7 +463,7 @@ public class TestAssignmentManager { // false indicate the region is not split testCaseWithSplitRegionPartial(false); } - + private void testCaseWithSplitRegionPartial(boolean regionSplitDone) throws KeeperException, IOException, NodeExistsException, InterruptedException, ServiceException { // Create and startup an executor. This is used by AssignmentManager @@ -523,7 +524,7 @@ public class TestAssignmentManager { // Create an AM. AssignmentManager am = new AssignmentManager(this.server, - this.serverManager, ct, balancer, executor, null); + this.serverManager, ct, balancer, executor, null, master.getTableLockManager()); // adding region to regions and servers maps. am.regionOnline(REGIONINFO, SERVERNAME_A); // adding region in pending close. @@ -644,7 +645,7 @@ public class TestAssignmentManager { .getConfiguration()); // Create an AM. AssignmentManager am = new AssignmentManager(this.server, - this.serverManager, ct, balancer, null, null); + this.serverManager, ct, balancer, null, null, master.getTableLockManager()); try { // First make sure my mock up basically works. Unassign a region. unassign(am, SERVERNAME_A, hri); @@ -672,7 +673,7 @@ public class TestAssignmentManager { * Tests the processDeadServersAndRegionsInTransition should not fail with NPE * when it failed to get the children. Let's abort the system in this * situation - * @throws ServiceException + * @throws ServiceException */ @Test(timeout = 5000) public void testProcessDeadServersAndRegionsInTransitionShouldNotFailWithNPE() @@ -701,7 +702,7 @@ public class TestAssignmentManager { } } /** - * TestCase verifies that the regionPlan is updated whenever a region fails to open + * TestCase verifies that the regionPlan is updated whenever a region fails to open * and the master tries to process RS_ZK_FAILED_OPEN state.(HBASE-5546). */ @Test(timeout = 5000) @@ -788,7 +789,7 @@ public class TestAssignmentManager { this.gate.set(true); return randomServerName; } - + @Override public Map> retainAssignment( Map regions, List servers) { @@ -829,7 +830,7 @@ public class TestAssignmentManager { /** * Test verifies whether assignment is skipped for regions of tables in DISABLING state during * clean cluster startup. See HBASE-6281. - * + * * @throws KeeperException * @throws IOException * @throws Exception @@ -875,7 +876,7 @@ public class TestAssignmentManager { /** * Test verifies whether all the enabling table regions assigned only once during master startup. - * + * * @throws KeeperException * @throws IOException * @throws Exception @@ -895,7 +896,8 @@ public class TestAssignmentManager { try { // set table in enabling state. am.getZKTable().setEnablingTable(REGIONINFO.getTableNameAsString()); - new EnableTableHandler(server, REGIONINFO.getTableName(), am.getCatalogTracker(), am, true) + new EnableTableHandler(server, REGIONINFO.getTableName(), am.getCatalogTracker(), + am, new NullTableLockManager(), true) .process(); assertEquals("Number of assignments should be 1.", 1, assignmentCount); assertTrue("Table should be enabled.", @@ -1038,7 +1040,7 @@ public class TestAssignmentManager { ExecutorService executor = startupMasterExecutor("mockedAMExecutor"); this.balancer = LoadBalancerFactory.getLoadBalancer(server.getConfiguration()); AssignmentManagerWithExtrasForTesting am = new AssignmentManagerWithExtrasForTesting( - server, manager, ct, this.balancer, executor); + server, manager, ct, this.balancer, executor, new NullTableLockManager()); return am; } @@ -1057,8 +1059,9 @@ public class TestAssignmentManager { public AssignmentManagerWithExtrasForTesting( final Server master, final ServerManager serverManager, final CatalogTracker catalogTracker, final LoadBalancer balancer, - final ExecutorService service) throws KeeperException, IOException { - super(master, serverManager, catalogTracker, balancer, service, null); + final ExecutorService service, final TableLockManager tableLockManager) + throws KeeperException, IOException { + super(master, serverManager, catalogTracker, balancer, service, null, tableLockManager); this.es = service; this.ct = catalogTracker; } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java index 3073041..c2d4ba7 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java @@ -30,7 +30,6 @@ import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; -import com.google.protobuf.Service; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -73,6 +72,7 @@ import org.junit.experimental.categories.Category; import org.mockito.Mockito; import com.google.protobuf.RpcController; +import com.google.protobuf.Service; import com.google.protobuf.ServiceException; @Category(SmallTests.class) @@ -229,6 +229,11 @@ public class TestCatalogJanitor { } @Override + public TableLockManager getTableLockManager() { + return null; + } + + @Override public void abort(String why, Throwable e) { //no-op } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java new file mode 100644 index 0000000..315c077 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java @@ -0,0 +1,271 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableNotDisabledException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.After; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Tests the default table lock manager + */ +@Category(MediumTests.class) +public class TestTableLockManager { + + private static final Log LOG = + LogFactory.getLog(TestTableLockManager.class); + + private static final byte[] TABLE_NAME = Bytes.toBytes("TestTableLevelLocks"); + + private static final byte[] FAMILY = Bytes.toBytes("f1"); + + private static final byte[] NEW_FAMILY = Bytes.toBytes("f2"); + + private final HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + + private static final CountDownLatch deleteColumn = new CountDownLatch(1); + private static final CountDownLatch addColumn = new CountDownLatch(1); + + public void prepareMiniCluster() throws Exception { + TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true); + TEST_UTIL.startMiniCluster(2); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + } + + public void prepareMiniZkCluster() throws Exception { + TEST_UTIL.startMiniZKCluster(1); + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test(timeout = 60000) + public void testLockTimeoutException() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(HConstants.TABLE_LOCK_TIMEOUT_MS, 3000); + prepareMiniCluster(); + HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); + master.getCoprocessorHost().load(TestLockTimeoutExceptionMasterObserver.class, + 0, TEST_UTIL.getConfiguration()); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future shouldFinish = executor.submit(new Callable() { + @Override + public Object call() throws Exception { + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + admin.deleteColumn(TABLE_NAME, FAMILY); + return null; + } + }); + + deleteColumn.await(); + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + admin.addColumn(TABLE_NAME, new HColumnDescriptor(NEW_FAMILY)); + + //this request will timeout, but not in rpc, it is an async call + shouldFinish.get(); + } + + public static class TestLockTimeoutExceptionMasterObserver extends BaseMasterObserver { + @Override + public void preDeleteColumnHandler(ObserverContext ctx, + byte[] tableName, byte[] c) throws IOException { + deleteColumn.countDown(); + } + @Override + public void postDeleteColumnHandler(ObserverContext ctx, + byte[] tableName, byte[] c) throws IOException { + Threads.sleep(10000); + } + + @Override + public void preAddColumnHandler(ObserverContext ctx, + byte[] tableName, HColumnDescriptor column) throws IOException { + fail("Add column should have timeouted out for acquiring the table lock"); + } + } + + @Test(timeout = 60000) + public void testAlterAndDisable() throws Exception { + prepareMiniCluster(); + // Send a request to alter a table, then sleep during + // the alteration phase. In the mean time, from another + // thread, send a request to disable, and then delete a table. + + HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); + master.getCoprocessorHost().load(TestAlterAndDisableMasterObserver.class, + 0, TEST_UTIL.getConfiguration()); + + ExecutorService executor = Executors.newFixedThreadPool(2); + Future alterTableFuture = executor.submit(new Callable() { + @Override + public Object call() throws Exception { + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + admin.addColumn(TABLE_NAME, new HColumnDescriptor(NEW_FAMILY)); + LOG.info("Added new column family"); + HTableDescriptor tableDesc = admin.getTableDescriptor(TABLE_NAME); + assertTrue(tableDesc.getFamiliesKeys().contains(NEW_FAMILY)); + return null; + } + }); + Future disableTableFuture = executor.submit(new Callable() { + @Override + public Object call() throws Exception { + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + admin.disableTable(TABLE_NAME); + assertTrue(admin.isTableDisabled(TABLE_NAME)); + admin.deleteTable(TABLE_NAME); + assertFalse(admin.tableExists(TABLE_NAME)); + return null; + } + }); + + try { + disableTableFuture.get(); + alterTableFuture.get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof AssertionError) { + throw (AssertionError) e.getCause(); + } + throw e; + } + } + + public static class TestAlterAndDisableMasterObserver extends BaseMasterObserver { + @Override + public void preAddColumnHandler(ObserverContext ctx, + byte[] tableName, HColumnDescriptor column) throws IOException { + LOG.debug("addColumn called"); + addColumn.countDown(); + } + + @Override + public void postAddColumnHandler(ObserverContext ctx, + byte[] tableName, HColumnDescriptor column) throws IOException { + Threads.sleep(6000); + try { + ctx.getEnvironment().getMasterServices().checkTableModifiable(tableName); + } catch(TableNotDisabledException expected) { + //pass + return; + } catch(IOException ex) { + } + fail("was expecting the table to be enabled"); + } + + @Override + public void preDisableTable(ObserverContext ctx, + byte[] tableName) throws IOException { + try { + LOG.debug("Waiting for addColumn to be processed first"); + //wait for addColumn to be processed first + addColumn.await(); + LOG.debug("addColumn started, we can continue"); + } catch (InterruptedException ex) { + LOG.warn("Sleep interrupted while waiting for addColumn countdown"); + } + } + + @Override + public void postDisableTableHandler(ObserverContext ctx, + byte[] tableName) throws IOException { + Threads.sleep(3000); + } + } + + + @Test(timeout = 60000) + public void testReapAllTableLocks() throws Exception { + prepareMiniZkCluster(); + ServerName serverName = new ServerName("localhost:10000", 0); + final TableLockManager lockManager = TableLockManager.createTableLockManager( + TEST_UTIL.getConfiguration(), TEST_UTIL.getZooKeeperWatcher(), serverName); + + String tables[] = {"table1", "table2", "table3", "table4"}; + ExecutorService executor = Executors.newFixedThreadPool(5); + + final CountDownLatch locksObtained = new CountDownLatch(4); + final CountDownLatch locksAttempted = new CountDownLatch(10); + //TODO: read lock tables + + //5 threads will be stuck waiting for the table lock + for (int i = 0; i < tables.length; i++) { + final String table = tables[i]; + for (int j = 0; j < i+1; j++) { //i+1 write locks attempted for table[i] + executor.submit(new Callable() { + @Override + public Void call() throws Exception { + locksAttempted.countDown(); + lockManager.lockTable(Bytes.toBytes(table), "testReapAllTableLocks"); + locksObtained.countDown(); + return null; + } + }); + } + } + + locksObtained.await(); + locksAttempted.await(); + + //now reap all table locks + lockManager.reapAllTableWriteLocks(); + + TEST_UTIL.getConfiguration().setInt(HConstants.TABLE_LOCK_TIMEOUT_MS, 0); + TableLockManager zeroTimeoutLockManager = TableLockManager.createTableLockManager( + TEST_UTIL.getConfiguration(), TEST_UTIL.getZooKeeperWatcher(), serverName); + + //should not throw table lock timeout exception + zeroTimeoutLockManager.lockTable(Bytes.toBytes(tables[tables.length -1]), "zero timeout"); + + executor.shutdownNow(); + } + +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInterProcessReadWriteLock.java hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInterProcessReadWriteLock.java new file mode 100644 index 0000000..b7faf72 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInterProcessReadWriteLock.java @@ -0,0 +1,316 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DaemonThreadFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.MultithreadedTestUtil; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; +import com.netflix.curator.framework.CuratorFramework; +import com.netflix.curator.framework.recipes.locks.InterProcessMutex; +import com.netflix.curator.framework.recipes.locks.InterProcessReadWriteLock; + +/** + * Tests curator's InterProcessReadWriteLock implementation + */ +@Category(MediumTests.class) +public class TestInterProcessReadWriteLock { + + private static final Log LOG = + LogFactory.getLog(TestInterProcessReadWriteLock.class); + + private static final HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + + private static final int NUM_THREADS = 10; + + private static Configuration conf; + + private final AtomicBoolean isLockHeld = new AtomicBoolean(false); + private final ExecutorService executor = + Executors.newFixedThreadPool(NUM_THREADS, + new DaemonThreadFactory("TestDistributedReadWriteLock-")); + + @BeforeClass + public static void beforeAllTests() throws Exception { + conf = TEST_UTIL.getConfiguration(); + TEST_UTIL.startMiniZKCluster(); + conf.setInt(HConstants.ZOOKEEPER_SESSION_TIMEOUT, 1000); + ZooKeeperWatcher zkw = getZooKeeperWatcher("setup"); + ZKUtil.createWithParents(zkw, zkw.tableLockZNode); + } + + @AfterClass + public static void afterAllTests() throws Exception { + TEST_UTIL.shutdownMiniZKCluster(); + } + + @After + public void tearDown() { + executor.shutdown(); + } + + private static ZooKeeperWatcher getZooKeeperWatcher(String desc) + throws IOException { + return TEST_UTIL.getZooKeeperWatcher(); + } + + private static CuratorFramework getCuratorClient() throws IOException { + return getZooKeeperWatcher("").getRecoverableZooKeeper().getCuratorClient(); + } + + + @Test(timeout = 30000) + public void testWriteLockExcludesWriters() throws Exception { + final String testName = "testWriteLockExcludesWriters"; + CuratorFramework curator = getCuratorClient(); + final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(curator, "/" + testName); + + List> results = Lists.newArrayList(); + for (int i = 0; i < NUM_THREADS; ++i) { + final String threadDesc = testName + i; + results.add(executor.submit(new Callable() { + @Override + public Void call() throws Exception { + InterProcessMutex writeLock = lock.writeLock(); + try { + writeLock.acquire(); + try { + // No one else should hold the lock + assertTrue(isLockHeld.compareAndSet(false, true)); + Thread.sleep(1000); + // No one else should have released the lock + assertTrue(isLockHeld.compareAndSet(true, false)); + } finally { + isLockHeld.set(false); + writeLock.release(); + } + } catch (InterruptedException e) { + LOG.warn(threadDesc + " interrupted", e); + Thread.currentThread().interrupt(); + throw new InterruptedIOException(); + } + return null; + } + })); + + } + MultithreadedTestUtil.assertOnFutures(results); + } + + @Test(timeout = 30000) + public void testReadLockDoesNotExcludeReaders() throws Exception { + final String testName = "testReadLockDoesNotExcludeReaders"; + CuratorFramework curator = getCuratorClient(); + final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(curator, "/" + testName); + + final CountDownLatch locksAcquiredLatch = new CountDownLatch(NUM_THREADS); + final AtomicInteger locksHeld = new AtomicInteger(0); + List> results = Lists.newArrayList(); + for (int i = 0; i < NUM_THREADS; ++i) { + results.add(executor.submit(new Callable() { + @Override + public Void call() throws Exception { + InterProcessMutex readLock = lock.readLock(); + readLock.acquire(); + try { + locksHeld.incrementAndGet(); + locksAcquiredLatch.countDown(); + Thread.sleep(1000); + } finally { + readLock.release(); + locksHeld.decrementAndGet(); + } + return null; + } + })); + } + locksAcquiredLatch.await(); + assertEquals(locksHeld.get(), NUM_THREADS); + MultithreadedTestUtil.assertOnFutures(results); + } + + @Test(timeout = 3000) + public void testReadLockExcludesWriters() throws Exception { + // Submit a read lock request first + // Submit a write lock request second + final String testName = "testReadLockExcludesWriters"; + List> results = Lists.newArrayList(); + final CountDownLatch readLockAcquiredLatch = new CountDownLatch(1); + final CuratorFramework curator = getCuratorClient(); + Callable acquireReadLock = new Callable() { + @Override + public Void call() throws Exception { + final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(curator, "/" + testName); + InterProcessMutex readLock = lock.readLock(); + + readLock.acquire(); + try { + assertTrue(isLockHeld.compareAndSet(false, true)); + readLockAcquiredLatch.countDown(); + Thread.sleep(1000); + } finally { + isLockHeld.set(false); + readLock.release(); + } + return null; + } + }; + Callable acquireWriteLock = new Callable() { + @Override + public Void call() throws Exception { + final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(curator, "/" + testName); + InterProcessMutex writeLock = lock.writeLock(); + readLockAcquiredLatch.await(); + assertTrue(isLockHeld.get()); + writeLock.acquire(); + try { + assertFalse(isLockHeld.get()); + } finally { + writeLock.release(); + } + return null; + } + }; + results.add(executor.submit(acquireReadLock)); + results.add(executor.submit(acquireWriteLock)); + MultithreadedTestUtil.assertOnFutures(results); + } + + @Test(timeout = 30000) + public void testWriteLockExcludesReaders() throws Exception { + // Submit a read lock request first + // Submit a write lock request second + final String testName = "testReadLockExcludesWriters"; + final CuratorFramework curator = getCuratorClient(); + List> results = Lists.newArrayList(); + final CountDownLatch writeLockAcquiredLatch = new CountDownLatch(1); + Callable acquireWriteLock = new Callable() { + @Override + public Void call() throws Exception { + final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(curator, "/" + testName); + InterProcessMutex writeLock = lock.writeLock(); + writeLock.acquire(); + try { + writeLockAcquiredLatch.countDown(); + assertTrue(isLockHeld.compareAndSet(false, true)); + Thread.sleep(1000); + } finally { + isLockHeld.set(false); + writeLock.release(); + } + return null; + } + }; + Callable acquireReadLock = new Callable() { + @Override + public Void call() throws Exception { + final String threadDesc = testName + "-acquireReadLock"; + final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(curator, "/" + testName); + InterProcessMutex readLock = lock.readLock(); + writeLockAcquiredLatch.await(); + readLock.acquire(); + try { + assertFalse(isLockHeld.get()); + } finally { + readLock.release(); + } + return null; + } + }; + results.add(executor.submit(acquireWriteLock)); + results.add(executor.submit(acquireReadLock)); + MultithreadedTestUtil.assertOnFutures(results); + } + + @Test(timeout = 60000) + public void testTimeout() throws Exception { + final String testName = "testTimeout"; + final CuratorFramework curator = getCuratorClient(); + final CountDownLatch lockAcquiredLatch = new CountDownLatch(1); + Callable shouldHog = new Callable() { + @Override + public Void call() throws Exception { + final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(curator, "/" + testName); + InterProcessMutex writeLock = lock.writeLock(); + + writeLock.acquire(); + lockAcquiredLatch.countDown(); + Thread.sleep(10000); + writeLock.release(); + return null; + } + }; + Callable shouldTimeout = new Callable() { + @Override + public Void call() throws Exception { + final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(curator, "/" + testName); + InterProcessMutex readLock = lock.readLock(); + lockAcquiredLatch.await(); + assertFalse(readLock.acquire(5000, TimeUnit.MILLISECONDS)); + return null; + } + }; + Callable shouldAcquireLock = new Callable() { + @Override + public Void call() throws Exception { + final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(curator, "/" + testName); + InterProcessMutex writeLock = lock.writeLock(); + + lockAcquiredLatch.await(); + assertTrue(writeLock.acquire(30000, TimeUnit.MILLISECONDS)); + writeLock.release(); + return null; + } + }; + List> results = Lists.newArrayList(); + results.add(executor.submit(shouldHog)); + results.add(executor.submit(shouldTimeout)); + results.add(executor.submit(shouldAcquireLock)); + MultithreadedTestUtil.assertOnFutures(results); + } +}