diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 1e24b8c..39bb622 100644
--- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -122,6 +122,20 @@ public final class HConstants {
public static final String ZOOKEEPER_SESSION_TIMEOUT =
"zookeeper.session.timeout";
+ /** Configuration key for enabling table-level locks for schema changes */
+ public static final String MASTER_TABLE_LOCK_ENABLE =
+ "hbase.table.lock.enable";
+
+ /** by default we should enable table-level locks for schema changes */
+ public static final boolean DEFAULT_TABLE_LOCK_ENABLE = true;
+
+ /** Configuration key for time out for trying to acquire table locks */
+ public static final String TABLE_LOCK_TIMEOUT_MS =
+ "hbase.table.lock.timeout.ms";
+
+ public static final int DEFAULT_TABLE_LOCK_TIMEOUT_MS =
+ 600 * 1000; //10 min default
+
/** Name of ZooKeeper quorum configuration parameter. */
public static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
@@ -745,7 +759,7 @@ public final class HConstants {
Arrays.asList(new String[] { HREGION_LOGDIR_NAME, HREGION_OLDLOGDIR_NAME, CORRUPT_DIR_NAME,
toString(META_TABLE_NAME), toString(ROOT_TABLE_NAME), SPLIT_LOGDIR_NAME,
HBCK_SIDELINEDIR_NAME, HFILE_ARCHIVE_DIRECTORY }));
-
+
private HConstants() {
// Can't be instantiated with this ctor.
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/TableLockTimeoutException.java hbase-server/src/main/java/org/apache/hadoop/hbase/TableLockTimeoutException.java
new file mode 100644
index 0000000..cd742ad
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/TableLockTimeoutException.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+public class TableLockTimeoutException extends IOException {
+
+ private static final long serialVersionUID = -1770764924258999825L;
+
+ /** Default constructor */
+ public TableLockTimeoutException() {
+ super();
+ }
+
+ public TableLockTimeoutException(String s) {
+ super(s);
+ }
+
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 9b5f83f..05c2215 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -40,7 +39,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import com.google.common.collect.LinkedHashMultimap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -85,6 +83,8 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.data.Stat;
+import com.google.common.collect.LinkedHashMultimap;
+
/**
* Manages and performs region assignment.
*
@@ -111,6 +111,8 @@ public class AssignmentManager extends ZooKeeperListener {
private LoadBalancer balancer;
+ private final TableLockManager tableLockManager;
+
final private KeyLocker locker = new KeyLocker();
/**
@@ -177,7 +179,8 @@ public class AssignmentManager extends ZooKeeperListener {
*/
public AssignmentManager(Server server, ServerManager serverManager,
CatalogTracker catalogTracker, final LoadBalancer balancer,
- final ExecutorService service, MetricsMaster metricsMaster) throws KeeperException, IOException {
+ final ExecutorService service, MetricsMaster metricsMaster,
+ final TableLockManager tableLockManager) throws KeeperException, IOException {
super(server.getZooKeeper());
this.server = server;
this.serverManager = serverManager;
@@ -195,6 +198,7 @@ public class AssignmentManager extends ZooKeeperListener {
Threads.setDaemonThreadRunning(timerUpdater.getThread(),
server.getServerName() + ".timerUpdater");
this.zkTable = new ZKTable(this.watcher);
+ this.tableLockManager = tableLockManager;
this.maximumAttempts =
this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
this.balancer = balancer;
@@ -2316,8 +2320,8 @@ public class AssignmentManager extends ZooKeeperListener {
LOG.info("The table " + tableName
+ " is in DISABLING state. Hence recovering by moving the table"
+ " to DISABLED state.");
- new DisableTableHandler(this.server, tableName.getBytes(),
- catalogTracker, this, true).process();
+ new DisableTableHandler(this.server, tableName.getBytes(), catalogTracker,
+ this, tableLockManager, true).process();
}
}
}
@@ -2342,7 +2346,7 @@ public class AssignmentManager extends ZooKeeperListener {
// enableTable in sync way during master startup,
// no need to invoke coprocessor
new EnableTableHandler(this.server, tableName.getBytes(),
- catalogTracker, this, true).process();
+ catalogTracker, this, tableLockManager, true).process();
}
}
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index d5f1f63..f45bc30 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -40,11 +40,6 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.management.ObjectName;
-import com.google.common.collect.Maps;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -167,12 +162,13 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
-import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.trace.SpanReceiverHost;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.InfoServer;
@@ -192,10 +188,13 @@ import org.apache.hadoop.metrics.util.MBeanUtil;
import org.apache.hadoop.net.DNS;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
-import org.apache.hadoop.hbase.trace.SpanReceiverHost;
-import org.apache.hadoop.hbase.util.FSUtils;
+import com.google.common.collect.Maps;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
/**
@@ -320,6 +319,9 @@ Server {
private Map coprocessorServiceHandlers = Maps.newHashMap();
+ // Table level lock manager for schema changes
+ private TableLockManager tableLockManager;
+
/**
* Initializes the HMaster. The steps are as follows:
*
@@ -399,6 +401,7 @@ Server {
this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true);
this.metricsMaster = new MetricsMaster( new MetricsMasterWrapperImpl(this));
+
}
/**
@@ -538,7 +541,8 @@ Server {
this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
this.loadBalancerTracker.start();
this.assignmentManager = new AssignmentManager(this, serverManager,
- this.catalogTracker, this.balancer, this.executorService, this.metricsMaster);
+ this.catalogTracker, this.balancer, this.executorService, this.metricsMaster,
+ this.tableLockManager);
zooKeeper.registerListenerFirst(assignmentManager);
this.regionServerTracker = new RegionServerTracker(zooKeeper, this,
@@ -673,6 +677,11 @@ Server {
startServiceThreads();
}
+ this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper, serverName);
+ if (!masterRecovery) {
+ this.tableLockManager.reapAllTableWriteLocks();
+ }
+
// Wait for region servers to report in.
this.serverManager.waitForRegionServers(status);
// Check zk for region servers that are up but didn't register
@@ -1018,6 +1027,11 @@ Server {
return this.zooKeeper;
}
+ @Override
+ public TableLockManager getTableLockManager() {
+ return this.tableLockManager;
+ }
+
/*
* Start up all services. If any of these threads gets an unhandled exception
* then they just die with a logged message. This should be fine because
@@ -1342,6 +1356,14 @@ Server {
return balancerRan;
}
+ protected void lockTable(byte[] tableName, String purpose) throws IOException {
+ tableLockManager.lockTable(tableName, purpose);
+ }
+
+ protected void unlockTable(byte[] tableName) throws IOException {
+ tableLockManager.unlockTable(tableName);
+ }
+
@Override
public BalanceResponse balance(RpcController c, BalanceRequest request) throws ServiceException {
try {
@@ -1496,11 +1518,10 @@ Server {
this.executorService.submit(new CreateTableHandler(this,
this.fileSystemManager, hTableDescriptor, conf,
- newRegions, catalogTracker, assignmentManager));
+ newRegions, this));
if (cpHost != null) {
cpHost.postCreateTable(hTableDescriptor, newRegions);
}
-
}
private void checkCompression(final HTableDescriptor htd)
@@ -1696,7 +1717,7 @@ Server {
cpHost.preEnableTable(tableName);
}
this.executorService.submit(new EnableTableHandler(this, tableName,
- catalogTracker, assignmentManager, false));
+ catalogTracker, assignmentManager, tableLockManager, false));
if (cpHost != null) {
cpHost.postEnableTable(tableName);
}
@@ -1720,7 +1741,7 @@ Server {
cpHost.preDisableTable(tableName);
}
this.executorService.submit(new DisableTableHandler(this, tableName,
- catalogTracker, assignmentManager, false));
+ catalogTracker, assignmentManager, tableLockManager, false));
if (cpHost != null) {
cpHost.postDisableTable(tableName);
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index f9aa860..03d6b75 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
-import com.google.protobuf.Service;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -30,6 +29,8 @@ import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.executor.ExecutorService;
+import com.google.protobuf.Service;
+
/**
* Services Master supplies
*/
@@ -56,6 +57,11 @@ public interface MasterServices extends Server {
public ExecutorService getExecutorService();
/**
+ * @return Master's instance of {@link TableLockManager}
+ */
+ public TableLockManager getTableLockManager();
+
+ /**
* Check table is modifiable; i.e. exists and is offline.
* @param tableName Name of table to check.
* @throws TableNotDisabledException
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java
new file mode 100644
index 0000000..c7dc21c
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java
@@ -0,0 +1,279 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableLockTimeoutException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.util.StringUtils;
+
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.recipes.locks.InterProcessLock;
+import com.netflix.curator.framework.recipes.locks.InterProcessMutex;
+import com.netflix.curator.framework.recipes.locks.InterProcessReadWriteLock;
+
+/**
+ * A manager for distributed table level locks.
+ */
+public abstract class TableLockManager {
+
+ private static final Log LOG = LogFactory.getLog(TableLockManager.class);
+
+ /**
+ * Lock a table, given a purpose.
+ * @param tableName Table to lock
+ * @param purpose Human readable reason for locking the table
+ * @throws TableLockTimeoutException If unable to acquire a lock within a
+ * specified time period (if any)
+ * @throws IOException If unrecoverable ZooKeeper error occurs
+ */
+ public abstract void lockTable(byte[] tableName, String purpose) throws IOException;
+
+ /**
+ * Lock a table, given a purpose. Does not throw an exception.
+ * @param tableName Table to lock
+ * @param purpose Human readable reason for locking the table
+ * @return whether lockTable was successful
+ */
+ public boolean lockTableFailSilently(byte[] tableName, String purpose) {
+ try {
+ lockTable(tableName, purpose);
+ return true;
+ } catch (Exception ex) {
+ LOG.warn("locking table " + tableName + " failed with: " +
+ StringUtils.stringifyException(ex));
+ }
+ return false;
+ }
+
+ /**
+ * UnLock a table
+ * @param tableName Table to unlock
+ * @throws IOException If unrecoverable ZooKeeper error occurs
+ */
+ public abstract void unlockTable(byte[] tableName) throws IOException;
+
+ /**
+ * UnLock a table. Does not throw an exception.
+ * @param tableName Table to unlock
+ * @throws IOException If unrecoverable ZooKeeper error occurs
+ */
+ public void unlockTableFailSilently(byte[] tableName) {
+ try {
+ unlockTable(tableName);
+ } catch (Exception ex) {
+ LOG.warn("unlocking table " + tableName + " failed with: " +
+ StringUtils.stringifyException(ex));
+ }
+ }
+
+ /**
+ * Force releases all table write locks even if this thread does not
+ * own the lock. The behavior of the lock holders still thinking that they
+ * have the lock is undefined. This should be used carefully and only when
+ * we can ensure that all write-lock holders have died. For example if only
+ * the master can hold write locks, then we can reap it's locks when the backup
+ * master starts.
+ */
+ public abstract void reapAllTableWriteLocks() throws IOException;
+
+ /**
+ * Creates and returns a TableLockManager according to the configuration
+ */
+ public static TableLockManager createTableLockManager(Configuration conf,
+ ZooKeeperWatcher zkWatcher, ServerName serverName) {
+ // Initialize table level lock manager for schema changes, if enabled.
+ if (conf.getBoolean(HConstants.MASTER_TABLE_LOCK_ENABLE,
+ HConstants.DEFAULT_TABLE_LOCK_ENABLE)) {
+ int lockTimeoutMs = conf.getInt(HConstants.TABLE_LOCK_TIMEOUT_MS,
+ HConstants.DEFAULT_TABLE_LOCK_TIMEOUT_MS);
+ return new CuratorTableLockManager(zkWatcher, serverName, lockTimeoutMs);
+ //return new ZKTableLockManager(zkWatcher, serverName, schemaChangeLockTimeoutMs);
+ } else {
+ return new NullTableLockManager();
+ }
+ }
+
+ /**
+ * A null implementation
+ */
+ public static class NullTableLockManager extends TableLockManager {
+ @Override
+ public void lockTable(byte[] tableName, String purpose) throws IOException {
+ }
+ @Override
+ public void unlockTable(byte[] tableName) throws IOException {
+ }
+ @Override
+ public void reapAllTableWriteLocks() throws IOException {
+ }
+ }
+
+ private static class CuratorTableLockManager extends TableLockManager {
+
+ private final ServerName serverName;
+ private final ZooKeeperWatcher zkWatcher;
+ private final long lockTimeoutMs;
+
+ // IPLocks are reentrant in the same thread. We are caching one lock per table.
+ private final ConcurrentMap locks;
+
+ /**
+ * Copied from InterProcessReadWriteLock.java
+ */
+ private static final String WRITE_LOCK_NAME = "__WRIT__";
+
+ public CuratorTableLockManager(ZooKeeperWatcher zkWatcher, ServerName serverName, long lockTimeoutMs) {
+ locks = new ConcurrentHashMap();
+ this.zkWatcher = zkWatcher;
+ this.serverName = serverName;
+ this.lockTimeoutMs = lockTimeoutMs;
+ }
+
+ /**
+ * Lock a table, given a purpose.
+ * @param tableName Table to lock
+ * @param purpose Human readable reason for locking the table
+ * @throws TableLockTimeoutException If unable to acquire a lock within a
+ * specified time period (if any)
+ * @throws IOException If unrecoverable ZooKeeper error occurs
+ */
+ public void lockTable(byte[] tableName, String purpose) throws IOException {
+ String tableNameStr = Bytes.toString(tableName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Attempt to acquire table lock on :" + tableNameStr + " for:" + purpose);
+ }
+ InterProcessReadWriteLock lock = createOrGetTableLock(tableNameStr, purpose);
+
+ InterProcessLock writeLock = lock.writeLock();
+
+ try {
+ if (lockTimeoutMs < 0) {
+ writeLock.acquire();
+ } else {
+ if (!writeLock.acquire(lockTimeoutMs, TimeUnit.MILLISECONDS)) {
+ throw new TableLockTimeoutException("Timed out acquiring " +
+ "lock for " + tableNameStr + " after " + lockTimeoutMs + " ms.");
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Acquired table lock on :" + tableNameStr + " for:" + purpose);
+ }
+ } catch (InterruptedException ex) {
+ LOG.warn("Interrupted acquiring a lock for " + tableNameStr, ex);
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException("Interrupted acquiring a lock");
+ } catch (IOException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
+ }
+
+ /** Returns a reentrant lock or creates one for this process */
+ private InterProcessReadWriteLock createOrGetTableLock(String tableName, String purpose) {
+ InterProcessReadWriteLock lock = locks.get(tableName);
+ if (lock != null) {
+ return lock;
+ }
+
+ String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
+
+ lock = new InterProcessReadWriteLock(getCuratorClient(), tableLockZNode);
+ InterProcessReadWriteLock returned = locks.putIfAbsent(tableName, lock);
+ return returned == null ? lock : returned;
+ }
+
+ private CuratorFramework getCuratorClient() {
+ return zkWatcher.getRecoverableZooKeeper().getCuratorClient();
+ }
+
+ public void unlockTable(byte[] tableName) throws IOException {
+ String tableNameStr = Bytes.toString(tableName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Attempt to release table lock on :" + tableNameStr);
+ }
+
+ InterProcessReadWriteLock lock = createOrGetTableLock(tableNameStr, "unlock");
+ InterProcessLock writeLock = lock.writeLock();
+
+ try {
+ writeLock.release();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Relased table lock on : " + tableNameStr);
+ }
+ if (!writeLock.isAcquiredInThisProcess()) {
+ locks.remove(writeLock); //TODO: race condition
+ }
+ } catch (InterruptedException ex) {
+ LOG.warn("Interrupted while releasing a lock for " + tableNameStr, ex);
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException();
+ } catch (IOException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
+ }
+
+ @Override
+ public void reapAllTableWriteLocks() throws IOException {
+ //get the table names
+ try {
+ CuratorFramework curatorClient = getCuratorClient();
+ List tableNames = curatorClient.getChildren().forPath(zkWatcher.tableLockZNode);
+
+ for (String tableName : tableNames) {
+ String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
+ InterProcessReadWriteLock lock
+ = new InterProcessReadWriteLock(curatorClient, tableLockZNode);
+ InterProcessMutex writeLock = lock.writeLock();
+ Collection znodes = writeLock.getParticipantNodes();
+ for (String znode : znodes) {
+ if (znode.contains(WRITE_LOCK_NAME)) {
+ LOG.info("Reaping table write lock for table:" + tableName +
+ " znode:" + znode);
+ curatorClient.delete().guaranteed().forPath(znode);
+ }
+ }
+ }
+ } catch (IOException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ LOG.warn("Caught exception while reaping table write locks", ex);
+ }
+ }
+ }
+
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
index 0130f51..e766d75 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.Threads;
@@ -65,11 +67,12 @@ public class CreateTableHandler extends EventHandler {
private Configuration conf;
private final AssignmentManager assignmentManager;
private final CatalogTracker catalogTracker;
+ private final TableLockManager tableLockManager;
private final HRegionInfo [] newRegions;
public CreateTableHandler(Server server, MasterFileSystem fileSystemManager,
HTableDescriptor hTableDescriptor, Configuration conf, HRegionInfo [] newRegions,
- CatalogTracker catalogTracker, AssignmentManager assignmentManager)
+ MasterServices masterServices)
throws NotAllMetaRegionsOnlineException, TableExistsException, IOException {
super(server, EventType.C_M_CREATE_TABLE);
@@ -77,8 +80,9 @@ public class CreateTableHandler extends EventHandler {
this.hTableDescriptor = hTableDescriptor;
this.conf = conf;
this.newRegions = newRegions;
- this.catalogTracker = catalogTracker;
- this.assignmentManager = assignmentManager;
+ this.catalogTracker = masterServices.getCatalogTracker();
+ this.assignmentManager = masterServices.getAssignmentManager();
+ this.tableLockManager = masterServices.getTableLockManager();
int timeout = conf.getInt("hbase.client.catalog.timeout", 10000);
// Need META availability to create a table
@@ -126,8 +130,14 @@ public class CreateTableHandler extends EventHandler {
@Override
public void process() {
String tableName = this.hTableDescriptor.getNameAsString();
+ LOG.info("Attempting to create the table " + tableName);
+
+ //acquire table lock before calling the coprocessors
+
+ if (!tableLockManager.lockTableFailSilently(hTableDescriptor.getName(), "create")) {
+ return;
+ }
try {
- LOG.info("Attempting to create the table " + tableName);
MasterCoprocessorHost cpHost = ((HMaster) this.server).getCoprocessorHost();
if (cpHost != null) {
cpHost.preCreateTableHandler(this.hTableDescriptor, this.newRegions);
@@ -140,6 +150,8 @@ public class CreateTableHandler extends EventHandler {
LOG.error("Error trying to create the table " + tableName, e);
} catch (KeeperException e) {
LOG.error("Error trying to create the table " + tableName, e);
+ } finally {
+ tableLockManager.unlockTableFailSilently(hTableDescriptor.getName());
}
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java
index 3a6b629..66a3c4c 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.master.BulkAssigner;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.RegionStates;
+import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
import org.cloudera.htrace.Trace;
@@ -50,15 +51,17 @@ public class DisableTableHandler extends EventHandler {
private final byte [] tableName;
private final String tableNameStr;
private final AssignmentManager assignmentManager;
+ private final TableLockManager tableLockManager;
public DisableTableHandler(Server server, byte [] tableName,
CatalogTracker catalogTracker, AssignmentManager assignmentManager,
- boolean skipTableStateCheck)
+ TableLockManager tableLockManager, boolean skipTableStateCheck)
throws TableNotFoundException, TableNotEnabledException, IOException {
super(server, EventType.C_M_DISABLE_TABLE);
this.tableName = tableName;
this.tableNameStr = Bytes.toString(this.tableName);
this.assignmentManager = assignmentManager;
+ this.tableLockManager = tableLockManager;
// Check if table exists
// TODO: do we want to keep this in-memory as well? i guess this is
// part of old master rewrite, schema to zk to check for table
@@ -100,6 +103,9 @@ public class DisableTableHandler extends EventHandler {
public void process() {
try {
LOG.info("Attempting to disable table " + this.tableNameStr);
+ if (!tableLockManager.lockTableFailSilently(tableName, "disable")) {
+ return;
+ }
MasterCoprocessorHost cpHost = ((HMaster) this.server)
.getCoprocessorHost();
if (cpHost != null) {
@@ -113,6 +119,8 @@ public class DisableTableHandler extends EventHandler {
LOG.error("Error trying to disable table " + this.tableNameStr, e);
} catch (KeeperException e) {
LOG.error("Error trying to disable table " + this.tableNameStr, e);
+ } finally {
+ tableLockManager.unlockTableFailSilently(tableName);
}
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java
index f46c870..b825535 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.zookeeper.KeeperException;
@@ -55,18 +56,20 @@ public class EnableTableHandler extends EventHandler {
private final byte [] tableName;
private final String tableNameStr;
private final AssignmentManager assignmentManager;
+ private final TableLockManager tableLockManager;
private final CatalogTracker ct;
private boolean retainAssignment = false;
public EnableTableHandler(Server server, byte [] tableName,
CatalogTracker catalogTracker, AssignmentManager assignmentManager,
- boolean skipTableStateCheck)
+ TableLockManager tableLockManager, boolean skipTableStateCheck)
throws TableNotFoundException, TableNotDisabledException, IOException {
super(server, EventType.C_M_ENABLE_TABLE);
this.tableName = tableName;
this.tableNameStr = Bytes.toString(tableName);
this.ct = catalogTracker;
this.assignmentManager = assignmentManager;
+ this.tableLockManager = tableLockManager;
this.retainAssignment = skipTableStateCheck;
// Check if table exists
if (!MetaReader.tableExists(catalogTracker, this.tableNameStr)) {
@@ -106,6 +109,9 @@ public class EnableTableHandler extends EventHandler {
public void process() {
try {
LOG.info("Attempting to enable the table " + this.tableNameStr);
+ if (!tableLockManager.lockTableFailSilently(tableName, "enable")) {
+ return;
+ }
MasterCoprocessorHost cpHost = ((HMaster) this.server)
.getCoprocessorHost();
if (cpHost != null) {
@@ -121,6 +127,8 @@ public class EnableTableHandler extends EventHandler {
LOG.error("Error trying to enable the table " + this.tableNameStr, e);
} catch (InterruptedException e) {
LOG.error("Error trying to enable the table " + this.tableNameStr, e);
+ } finally {
+ tableLockManager.unlockTableFailSilently(tableName);
}
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java
index c6f70e9..a2a06e5 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java
@@ -41,7 +41,7 @@ public class TableDeleteFamilyHandler extends TableEventHandler {
public TableDeleteFamilyHandler(byte[] tableName, byte [] familyName,
Server server, final MasterServices masterServices) throws IOException {
- super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices);
+ super(EventType.C_M_DELETE_FAMILY, tableName, server, masterServices);
HTableDescriptor htd = getTableDescriptor();
this.familyName = hasColumnFamily(htd, familyName);
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java
index aedfa96..c1badbf 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java
@@ -93,6 +93,11 @@ public abstract class TableEventHandler extends EventHandler {
try {
LOG.info("Handling table operation " + eventType + " on table " +
Bytes.toString(tableName));
+
+ if (!masterServices.getTableLockManager().lockTableFailSilently(
+ tableName, eventType.toString())) {
+ return;
+ }
List hris =
MetaReader.getTableRegions(this.server.getCatalogTracker(),
tableName);
@@ -112,6 +117,7 @@ public abstract class TableEventHandler extends EventHandler {
} catch (KeeperException e) {
LOG.error("Error manipulating table " + Bytes.toString(tableName), e);
} finally {
+ masterServices.getTableLockManager().unlockTableFailSilently(tableName);
// notify the waiting thread that we're done persisting the request
setPersist();
}
@@ -141,7 +147,7 @@ public abstract class TableEventHandler extends EventHandler {
reRegions.add(hri);
serverToRegions.get(rsLocation).add(hri);
}
-
+
LOG.info("Reopening " + reRegions.size() + " regions on "
+ serverToRegions.size() + " region servers.");
this.masterServices.getAssignmentManager().setRegionsToReopen(reRegions);
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java hbase-server/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java
index 8758f9e..f333a51 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java
@@ -18,8 +18,11 @@
*/
package org.apache.hadoop.hbase;
-import java.util.Set;
import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -91,7 +94,7 @@ public abstract class MultithreadedTestUtil {
stopped = s;
}
}
-
+
public void stop() throws Exception {
synchronized (this) {
stopped = true;
@@ -130,7 +133,7 @@ public abstract class MultithreadedTestUtil {
this.stopped = true;
}
}
-
+
/**
* A test thread that performs a repeating operation.
*/
@@ -138,13 +141,48 @@ public abstract class MultithreadedTestUtil {
public RepeatingTestThread(TestContext ctx) {
super(ctx);
}
-
+
public final void doWork() throws Exception {
while (ctx.shouldRun() && !stopped) {
doAnAction();
}
}
-
+
public abstract void doAnAction() throws Exception;
}
+
+ /**
+ * Verify that no assertions have failed inside a future.
+ * Used for unit tests that spawn threads. E.g.,
+ *
+ *
+ * List> results = Lists.newArrayList();
+ * Future f = executor.submit(new Callable {
+ * public Void call() {
+ * assertTrue(someMethod());
+ * }
+ * });
+ * results.add(f);
+ * assertOnFutures(results);
+ *
+ * @param threadResults A list of futures
+ * @param
+ * @throws InterruptedException If interrupted when waiting for a result
+ * from one of the futures
+ * @throws ExecutionException If an exception other than AssertionError
+ * occurs inside any of the futures
+ */
+ public static void assertOnFutures(List> threadResults)
+ throws InterruptedException, ExecutionException {
+ for (Future threadResult : threadResults) {
+ try {
+ threadResult.get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof AssertionError) {
+ throw (AssertionError) e.getCause();
+ }
+ throw e;
+ }
+ }
+ }
}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
index 8880fa0..9282411 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager;
import org.apache.hadoop.hbase.master.balancer.DefaultLoadBalancer;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
@@ -181,7 +182,7 @@ public class TestAssignmentManager {
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
- * @throws DeserializationException
+ * @throws DeserializationException
*/
@Test(timeout = 5000)
public void testBalanceOnMasterFailoverScenarioWithOpenedNode()
@@ -339,7 +340,7 @@ public class TestAssignmentManager {
* from one server to another mocking regionserver responding over zk.
* @throws IOException
* @throws KeeperException
- * @throws DeserializationException
+ * @throws DeserializationException
*/
@Test
public void testBalance()
@@ -354,7 +355,7 @@ public class TestAssignmentManager {
.getConfiguration());
// Create an AM.
AssignmentManager am = new AssignmentManager(this.server,
- this.serverManager, ct, balancer, executor, null);
+ this.serverManager, ct, balancer, executor, null, master.getTableLockManager());
am.failoverCleanupDone.set(true);
try {
// Make sure our new AM gets callbacks; once registered, can't unregister.
@@ -435,7 +436,7 @@ public class TestAssignmentManager {
* To test closed region handler to remove rit and delete corresponding znode
* if region in pending close or closing while processing shutdown of a region
* server.(HBASE-5927).
- *
+ *
* @throws KeeperException
* @throws IOException
* @throws ServiceException
@@ -446,12 +447,12 @@ public class TestAssignmentManager {
testCaseWithPartiallyDisabledState(Table.State.DISABLING);
testCaseWithPartiallyDisabledState(Table.State.DISABLED);
}
-
-
+
+
/**
* To test if the split region is removed from RIT if the region was in SPLITTING state but the RS
* has actually completed the splitting in META but went down. See HBASE-6070 and also HBASE-5806
- *
+ *
* @throws KeeperException
* @throws IOException
*/
@@ -462,7 +463,7 @@ public class TestAssignmentManager {
// false indicate the region is not split
testCaseWithSplitRegionPartial(false);
}
-
+
private void testCaseWithSplitRegionPartial(boolean regionSplitDone) throws KeeperException,
IOException, NodeExistsException, InterruptedException, ServiceException {
// Create and startup an executor. This is used by AssignmentManager
@@ -523,7 +524,7 @@ public class TestAssignmentManager {
// Create an AM.
AssignmentManager am = new AssignmentManager(this.server,
- this.serverManager, ct, balancer, executor, null);
+ this.serverManager, ct, balancer, executor, null, master.getTableLockManager());
// adding region to regions and servers maps.
am.regionOnline(REGIONINFO, SERVERNAME_A);
// adding region in pending close.
@@ -644,7 +645,7 @@ public class TestAssignmentManager {
.getConfiguration());
// Create an AM.
AssignmentManager am = new AssignmentManager(this.server,
- this.serverManager, ct, balancer, null, null);
+ this.serverManager, ct, balancer, null, null, master.getTableLockManager());
try {
// First make sure my mock up basically works. Unassign a region.
unassign(am, SERVERNAME_A, hri);
@@ -672,7 +673,7 @@ public class TestAssignmentManager {
* Tests the processDeadServersAndRegionsInTransition should not fail with NPE
* when it failed to get the children. Let's abort the system in this
* situation
- * @throws ServiceException
+ * @throws ServiceException
*/
@Test(timeout = 5000)
public void testProcessDeadServersAndRegionsInTransitionShouldNotFailWithNPE()
@@ -701,7 +702,7 @@ public class TestAssignmentManager {
}
}
/**
- * TestCase verifies that the regionPlan is updated whenever a region fails to open
+ * TestCase verifies that the regionPlan is updated whenever a region fails to open
* and the master tries to process RS_ZK_FAILED_OPEN state.(HBASE-5546).
*/
@Test(timeout = 5000)
@@ -788,7 +789,7 @@ public class TestAssignmentManager {
this.gate.set(true);
return randomServerName;
}
-
+
@Override
public Map> retainAssignment(
Map regions, List servers) {
@@ -829,7 +830,7 @@ public class TestAssignmentManager {
/**
* Test verifies whether assignment is skipped for regions of tables in DISABLING state during
* clean cluster startup. See HBASE-6281.
- *
+ *
* @throws KeeperException
* @throws IOException
* @throws Exception
@@ -875,7 +876,7 @@ public class TestAssignmentManager {
/**
* Test verifies whether all the enabling table regions assigned only once during master startup.
- *
+ *
* @throws KeeperException
* @throws IOException
* @throws Exception
@@ -895,7 +896,8 @@ public class TestAssignmentManager {
try {
// set table in enabling state.
am.getZKTable().setEnablingTable(REGIONINFO.getTableNameAsString());
- new EnableTableHandler(server, REGIONINFO.getTableName(), am.getCatalogTracker(), am, true)
+ new EnableTableHandler(server, REGIONINFO.getTableName(), am.getCatalogTracker(),
+ am, new NullTableLockManager(), true)
.process();
assertEquals("Number of assignments should be 1.", 1, assignmentCount);
assertTrue("Table should be enabled.",
@@ -1038,7 +1040,7 @@ public class TestAssignmentManager {
ExecutorService executor = startupMasterExecutor("mockedAMExecutor");
this.balancer = LoadBalancerFactory.getLoadBalancer(server.getConfiguration());
AssignmentManagerWithExtrasForTesting am = new AssignmentManagerWithExtrasForTesting(
- server, manager, ct, this.balancer, executor);
+ server, manager, ct, this.balancer, executor, new NullTableLockManager());
return am;
}
@@ -1057,8 +1059,9 @@ public class TestAssignmentManager {
public AssignmentManagerWithExtrasForTesting(
final Server master, final ServerManager serverManager,
final CatalogTracker catalogTracker, final LoadBalancer balancer,
- final ExecutorService service) throws KeeperException, IOException {
- super(master, serverManager, catalogTracker, balancer, service, null);
+ final ExecutorService service, final TableLockManager tableLockManager)
+ throws KeeperException, IOException {
+ super(master, serverManager, catalogTracker, balancer, service, null, tableLockManager);
this.es = service;
this.ct = catalogTracker;
}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
index 3073041..c2d4ba7 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
@@ -30,7 +30,6 @@ import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
-import com.google.protobuf.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -73,6 +72,7 @@ import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
@Category(SmallTests.class)
@@ -229,6 +229,11 @@ public class TestCatalogJanitor {
}
@Override
+ public TableLockManager getTableLockManager() {
+ return null;
+ }
+
+ @Override
public void abort(String why, Throwable e) {
//no-op
}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java
new file mode 100644
index 0000000..315c077
--- /dev/null
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java
@@ -0,0 +1,271 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableNotDisabledException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests the default table lock manager
+ */
+@Category(MediumTests.class)
+public class TestTableLockManager {
+
+ private static final Log LOG =
+ LogFactory.getLog(TestTableLockManager.class);
+
+ private static final byte[] TABLE_NAME = Bytes.toBytes("TestTableLevelLocks");
+
+ private static final byte[] FAMILY = Bytes.toBytes("f1");
+
+ private static final byte[] NEW_FAMILY = Bytes.toBytes("f2");
+
+ private final HBaseTestingUtility TEST_UTIL =
+ new HBaseTestingUtility();
+
+ private static final CountDownLatch deleteColumn = new CountDownLatch(1);
+ private static final CountDownLatch addColumn = new CountDownLatch(1);
+
+ public void prepareMiniCluster() throws Exception {
+ TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
+ TEST_UTIL.startMiniCluster(2);
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+ }
+
+ public void prepareMiniZkCluster() throws Exception {
+ TEST_UTIL.startMiniZKCluster(1);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test(timeout = 60000)
+ public void testLockTimeoutException() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setInt(HConstants.TABLE_LOCK_TIMEOUT_MS, 3000);
+ prepareMiniCluster();
+ HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
+ master.getCoprocessorHost().load(TestLockTimeoutExceptionMasterObserver.class,
+ 0, TEST_UTIL.getConfiguration());
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ Future