commit 71f709a70ad1330da26cecbb7bd05086914257a6 Author: Enis Soztutar Date: Fri Dec 7 16:02:18 2012 -0800 HBASE-5991. First patch, direct port of HBASE-5494 and HBASE-5991 diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 58c7d4c..f9d9820 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -122,6 +122,20 @@ public final class HConstants { public static final String ZOOKEEPER_SESSION_TIMEOUT = "zookeeper.session.timeout"; + /** Configuration key for enabling table-level locks for schema changes */ + public static final String MASTER_TABLE_LOCK_ENABLE = + "hbase.table.lock.enable"; + + /** by default we should enable table-level locks for schema changes */ + public static final boolean DEFAULT_TABLE_LOCK_ENABLE = true; + + /** Configuration key for time out for trying to acquire table locks */ + public static final String TABLE_LOCK_TIMEOUT_MS = + "hbase.table.lock.timeout.ms"; + + public static final int DEFAULT_TABLE_LOCK_TIMEOUT_MS = + 600 * 1000; //10 min default + /** Name of ZooKeeper quorum configuration parameter. */ public static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; @@ -731,7 +745,7 @@ public final class HConstants { Arrays.asList(new String[] { HREGION_LOGDIR_NAME, HREGION_OLDLOGDIR_NAME, CORRUPT_DIR_NAME, toString(META_TABLE_NAME), toString(ROOT_TABLE_NAME), SPLIT_LOGDIR_NAME, HBCK_SIDELINEDIR_NAME, HFILE_ARCHIVE_DIRECTORY })); - + private HConstants() { // Can't be instantiated with this ctor. } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/HLock.java hbase-server/src/main/java/org/apache/hadoop/hbase/HLock.java new file mode 100644 index 0000000..67e5858 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/HLock.java @@ -0,0 +1,74 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +/** + * An interface for an application-specific lock. + */ +public interface HLock { + + /** + * Acquire the lock, waiting indefinitely until the lock is released or + * the thread is interrupted. + * @throws IOException + * @throws InterruptedException If current thread is interrupted while + * waiting for the lock + */ + public void acquire() throws IOException, InterruptedException; + + /** + * Acquire the lock within a wait time. + * @param timeoutMs The maximum time (in milliseconds) to wait for the lock, + * -1 to wait indefinitely + * @return True if the lock was acquired, false if waiting time elapsed + * before the lock was acquired + * @throws IOException If there is an unrecoverable error talking talking + * (e.g., when talking to a lock service) when acquiring + * the lock + * @throws InterruptedException If the thread is interrupted while waiting to + * acquire the lock + */ + public boolean tryAcquire(long timeoutMs) + throws IOException, InterruptedException; + + /** + * Release the lock. + * @throws IOException If there is an unrecoverable error releasing the lock + * @throws InterruptedException If the thread is interrupted while releasing + * the lock + */ + public void release() throws IOException, InterruptedException; + + /** + * An interface for objects that process lock metadata. + */ + public static interface MetadataHandler { + + /** + * Called after lock metadata is successfully read from a distributed + * lock service. This method may contain any procedures for, e.g., + * printing the metadata in a humanly-readable format. + * @param metadata The metadata + */ + public void handleMetadata(byte[] metadata); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/HReadWriteLock.java hbase-server/src/main/java/org/apache/hadoop/hbase/HReadWriteLock.java new file mode 100644 index 0000000..ab874b0 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/HReadWriteLock.java @@ -0,0 +1,42 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +/** + * An interface for a distributed reader-writer lock. + */ +public interface HReadWriteLock { + + /** + * Obtain a reader lock containing given metadata. + * @param metadata Serialized lock metadata (this may contain information + * such as the process owning the lock or the purpose for + * which the lock was acquired). Must not be null. + * @return An instantiated DistributedLockInterface instance + */ + public HLock readLock(byte[] metadata); + + /** + * Obtain a writer lock containing given metadata. + * @param metadata See documentation of metadata parameter in readLock() + * @return An instantiated DistributedLockInterface instance + */ + public HLock writeLock(byte[] metadata); +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/TableLockTimeoutException.java hbase-server/src/main/java/org/apache/hadoop/hbase/TableLockTimeoutException.java new file mode 100644 index 0000000..cd742ad --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/TableLockTimeoutException.java @@ -0,0 +1,37 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +public class TableLockTimeoutException extends IOException { + + private static final long serialVersionUID = -1770764924258999825L; + + /** Default constructor */ + public TableLockTimeoutException() { + super(); + } + + public TableLockTimeoutException(String s) { + super(s); + } + +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 12668e9..4056b1f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -109,6 +109,8 @@ public class AssignmentManager extends ZooKeeperListener { private LoadBalancer balancer; + private final TableLockManager tableLockManager; + final private KeyLocker locker = new KeyLocker(); /** @@ -175,7 +177,8 @@ public class AssignmentManager extends ZooKeeperListener { */ public AssignmentManager(Server server, ServerManager serverManager, CatalogTracker catalogTracker, final LoadBalancer balancer, - final ExecutorService service, MetricsMaster metricsMaster) throws KeeperException, IOException { + final ExecutorService service, MetricsMaster metricsMaster, + final TableLockManager tableLockManager) throws KeeperException, IOException { super(server.getZooKeeper()); this.server = server; this.serverManager = serverManager; @@ -210,6 +213,7 @@ public class AssignmentManager extends ZooKeeperListener { zkEventWorkers[i] = Threads.getBoundedCachedThreadPool( 1, 60L, TimeUnit.SECONDS, threadFactory); } + this.tableLockManager = tableLockManager; } void startTimeOutMonitor() { @@ -2245,8 +2249,8 @@ public class AssignmentManager extends ZooKeeperListener { LOG.info("The table " + tableName + " is in DISABLING state. Hence recovering by moving the table" + " to DISABLED state."); - new DisableTableHandler(this.server, tableName.getBytes(), - catalogTracker, this, true).process(); + new DisableTableHandler(this.server, tableName.getBytes(), catalogTracker, + this, tableLockManager, true).process(); } } } @@ -2271,7 +2275,7 @@ public class AssignmentManager extends ZooKeeperListener { // enableTable in sync way during master startup, // no need to invoke coprocessor new EnableTableHandler(this.server, tableName.getBytes(), - catalogTracker, this, true).process(); + catalogTracker, this, tableLockManager, true).process(); } } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index c82b38f..4204c1a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -40,11 +40,6 @@ import java.util.concurrent.atomic.AtomicReference; import javax.management.ObjectName; -import com.google.common.collect.Maps; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.Service; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -167,12 +162,13 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.trace.SpanReceiverHost; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.InfoServer; @@ -192,10 +188,13 @@ import org.apache.hadoop.metrics.util.MBeanUtil; import org.apache.hadoop.net.DNS; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; -import org.apache.hadoop.hbase.trace.SpanReceiverHost; -import org.apache.hadoop.hbase.util.FSUtils; +import com.google.common.collect.Maps; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; +import com.google.protobuf.Service; import com.google.protobuf.ServiceException; /** @@ -320,6 +319,9 @@ Server { private Map coprocessorServiceHandlers = Maps.newHashMap(); + // Table level lock manager for schema changes + private final TableLockManager tableLockManager; + /** * Initializes the HMaster. The steps are as follows: *

@@ -394,6 +396,8 @@ Server { this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true); this.metricsMaster = new MetricsMaster( new MetricsMasterWrapperImpl(this)); + + this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper, serverName); } /** @@ -533,7 +537,8 @@ Server { this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this); this.loadBalancerTracker.start(); this.assignmentManager = new AssignmentManager(this, serverManager, - this.catalogTracker, this.balancer, this.executorService, this.metricsMaster); + this.catalogTracker, this.balancer, this.executorService, this.metricsMaster, + this.tableLockManager); zooKeeper.registerListenerFirst(assignmentManager); this.regionServerTracker = new RegionServerTracker(zooKeeper, this, @@ -1013,6 +1018,11 @@ Server { return this.zooKeeper; } + @Override + public TableLockManager getTableLockManager() { + return this.tableLockManager; + } + /* * Start up all services. If any of these threads gets an unhandled exception * then they just die with a logged message. This should be fine because @@ -1337,6 +1347,14 @@ Server { return balancerRan; } + protected void lockTable(byte[] tableName, String purpose) throws IOException { + tableLockManager.lockTable(tableName, purpose); + } + + protected void unlockTable(byte[] tableName) throws IOException { + tableLockManager.unlockTable(tableName); + } + @Override public BalanceResponse balance(RpcController c, BalanceRequest request) throws ServiceException { try { @@ -1491,11 +1509,10 @@ Server { this.executorService.submit(new CreateTableHandler(this, this.fileSystemManager, hTableDescriptor, conf, - newRegions, catalogTracker, assignmentManager)); + newRegions, this)); if (cpHost != null) { cpHost.postCreateTable(hTableDescriptor, newRegions); } - } private void checkCompression(final HTableDescriptor htd) @@ -1677,7 +1694,7 @@ Server { cpHost.preEnableTable(tableName); } this.executorService.submit(new EnableTableHandler(this, tableName, - catalogTracker, assignmentManager, false)); + catalogTracker, assignmentManager, tableLockManager, false)); if (cpHost != null) { cpHost.postEnableTable(tableName); @@ -1697,8 +1714,8 @@ Server { if (cpHost != null) { cpHost.preDisableTable(tableName); } - this.executorService.submit(new DisableTableHandler(this, tableName, - catalogTracker, assignmentManager, false)); + this.executorService.submit(new DisableTableHandler(this, tableName, catalogTracker, + assignmentManager, tableLockManager, false)); if (cpHost != null) { cpHost.postDisableTable(tableName); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 69f853c..919d654 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -20,13 +20,15 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; -import com.google.protobuf.Service; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableDescriptors; +import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.executor.ExecutorService; +import com.google.protobuf.Service; + /** * Services Master supplies */ @@ -53,10 +55,15 @@ public interface MasterServices extends Server { public ExecutorService getExecutorService(); /** + * @return Master's instance of {@link TableLockManager} + */ + public TableLockManager getTableLockManager(); + + /** * Check table is modifiable; i.e. exists and is offline. * @param tableName Name of table to check. * @throws TableNotDisabledException - * @throws TableNotFoundException + * @throws TableNotFoundException */ public void checkTableModifiable(final byte [] tableName) throws IOException; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java new file mode 100644 index 0000000..58b2c46 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java @@ -0,0 +1,239 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HLock; +import org.apache.hadoop.hbase.HLock.MetadataHandler; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableLockTimeoutException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.hbase.zookeeper.lock.HReadWriteLockImpl; +import org.apache.hadoop.util.StringUtils; + +/** + * A manager for distributed table level locks. + */ +public abstract class TableLockManager { + + private static final Log LOG = LogFactory.getLog(TableLockManager.class); + + private static final MetadataHandler METADATA_HANDLER = + new MetadataHandler() { + @Override + public void handleMetadata(byte[] ownerMetadata) { + LOG.info("Table is locked: " + Bytes.toString(ownerMetadata)); + } + }; + + /** + * Lock a table, given a purpose. + * @param tableName Table to lock + * @param purpose Human readable reason for locking the table + * @throws TableLockTimeoutException If unable to acquire a lock within a + * specified time period (if any) + * @throws IOException If unrecoverable ZooKeeper error occurs + */ + public abstract void lockTable(byte[] tableName, String purpose) throws IOException; + + /** + * Lock a table, given a purpose. Does not throw an exception. + * @param tableName Table to lock + * @param purpose Human readable reason for locking the table + * @return whether lockTable was successful + */ + public boolean lockTableFailSilently(byte[] tableName, String purpose) { + try { + lockTable(tableName, purpose); + return true; + } catch (Exception ex) { + LOG.warn("locking table " + tableName + " failed with: " + + StringUtils.stringifyException(ex)); + } + return false; + } + + /** + * UnLock a table + * @param tableName Table to unlock + * @throws IOException If unrecoverable ZooKeeper error occurs + */ + public abstract void unlockTable(byte[] tableName) throws IOException; + + /** + * UnLock a table. Does not throw an exception. + * @param tableName Table to unlock + * @throws IOException If unrecoverable ZooKeeper error occurs + */ + public void unlockTableFailSilently(byte[] tableName) { + try { + unlockTable(tableName); + } catch (Exception ex) { + LOG.warn("unlocking table " + tableName + " failed with: " + + StringUtils.stringifyException(ex)); + } + } + + /** + * Creates and returns a TableLockManager according to the configuration + */ + public static TableLockManager createTableLockManager(Configuration conf, + ZooKeeperWatcher zkWatcher, ServerName serverName) { + // Initialize table level lock manager for schema changes, if enabled. + if (conf.getBoolean(HConstants.MASTER_TABLE_LOCK_ENABLE, + HConstants.DEFAULT_TABLE_LOCK_ENABLE)) { + int schemaChangeLockTimeoutMs = conf.getInt(HConstants.TABLE_LOCK_TIMEOUT_MS, + HConstants.DEFAULT_TABLE_LOCK_TIMEOUT_MS); + return new ZKTableLockManager(zkWatcher, serverName, schemaChangeLockTimeoutMs); + } else { + return new NullTableLockManager(); + } + } + + /** + * A null implementation + */ + public static class NullTableLockManager extends TableLockManager { + @Override + public void lockTable(byte[] tableName, String purpose) throws IOException { + } + @Override + public void unlockTable(byte[] tableName) throws IOException { + } + } + + /** + * Zookeeper based TableLockManager + */ + private static class ZKTableLockManager extends TableLockManager { + /** + * Tables that are currently locked by this instance. Allows locks to + * be released by table name. + */ + private final ConcurrentMap acquiredTableLocks; + + private final ServerName serverName; + + private final ZooKeeperWatcher zkWatcher; + + private final int lockTimeoutMs; + + /** + * Initialize a new manager for table-level locks. + * @param zkWatcher + * @param serverName Address of the server responsible for acquiring and + * releasing the table-level locks + * @param lockTimeoutMs Timeout (in milliseconds) for acquiring a lock for a + * given table, or -1 for no timeout + */ + public ZKTableLockManager(ZooKeeperWatcher zkWatcher, + ServerName serverName, int lockTimeoutMs) { + this.zkWatcher = zkWatcher; + this.serverName = serverName; + this.lockTimeoutMs = lockTimeoutMs; + this.acquiredTableLocks = + new ConcurrentHashMap(); + } + + /** + * Lock a table, given a purpose. + * @param tableName Table to lock + * @param purpose Human readable reason for locking the table + * @throws TableLockTimeoutException If unable to acquire a lock within a + * specified time period (if any) + * @throws IOException If unrecoverable ZooKeeper error occurs + */ + public void lockTable(byte[] tableName, String purpose) + throws IOException { + String tableNameStr = Bytes.toString(tableName); + if (LOG.isDebugEnabled()) { + LOG.debug("Attempt to acquire table lock on :" + tableNameStr + " for:" + purpose); + } + HLock lock = createTableLock(tableNameStr, purpose); + try { + if (lockTimeoutMs == -1) { + // Wait indefinitely + lock.acquire(); + } else { + if (!lock.tryAcquire(lockTimeoutMs)) { + throw new TableLockTimeoutException("Timed out acquiring " + + "lock for " + tableNameStr + " after " + lockTimeoutMs + " ms."); + } + } + } catch (InterruptedException e) { + LOG.warn("Interrupted acquiring a lock for " + tableNameStr, e); + Thread.currentThread().interrupt(); + throw new InterruptedIOException("Interrupted acquiring a lock"); + } + + if (acquiredTableLocks.putIfAbsent(tableNameStr, lock) != null) { + // This should never execute if DistributedLock is implemented + // correctly. + LOG.error("Lock for " + tableNameStr + " acquired by multiple owners!"); + LOG.error("Currently held locks: " + acquiredTableLocks); + throw new IllegalStateException("Lock for " + tableNameStr + + " was acquired by multiple owners!"); + } + } + + private HLock createTableLock(String tableName, String purpose) { + String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName); + //TODO: pb this + byte[] lockMetadata = Bytes.toBytes("[Table = " + tableName + + "\nOwner server address = " + serverName + + "\nOwner thread id = " + Thread.currentThread().getId() + + "\nPurpose = " + purpose + "]"); + return new HReadWriteLockImpl(zkWatcher, tableLockZNode, + METADATA_HANDLER).writeLock(lockMetadata); + } + + public void unlockTable(byte[] tableName) + throws IOException { + String tableNameStr = Bytes.toString(tableName); + if (LOG.isDebugEnabled()) { + LOG.debug("Attempt to release table lock on :" + tableNameStr); + } + HLock lock = acquiredTableLocks.get(tableNameStr); + if (lock == null) { + throw new IllegalStateException("Table " + tableNameStr + + " is not locked!"); + } + + try { + acquiredTableLocks.remove(tableNameStr); + lock.release(); + } catch (InterruptedException e) { + LOG.warn("Interrupted while releasing a lock for " + tableNameStr); + Thread.currentThread().interrupt(); + throw new InterruptedIOException(); + } + } + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java index 0130f51..e766d75 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java @@ -49,6 +49,8 @@ import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.Threads; @@ -65,11 +67,12 @@ public class CreateTableHandler extends EventHandler { private Configuration conf; private final AssignmentManager assignmentManager; private final CatalogTracker catalogTracker; + private final TableLockManager tableLockManager; private final HRegionInfo [] newRegions; public CreateTableHandler(Server server, MasterFileSystem fileSystemManager, HTableDescriptor hTableDescriptor, Configuration conf, HRegionInfo [] newRegions, - CatalogTracker catalogTracker, AssignmentManager assignmentManager) + MasterServices masterServices) throws NotAllMetaRegionsOnlineException, TableExistsException, IOException { super(server, EventType.C_M_CREATE_TABLE); @@ -77,8 +80,9 @@ public class CreateTableHandler extends EventHandler { this.hTableDescriptor = hTableDescriptor; this.conf = conf; this.newRegions = newRegions; - this.catalogTracker = catalogTracker; - this.assignmentManager = assignmentManager; + this.catalogTracker = masterServices.getCatalogTracker(); + this.assignmentManager = masterServices.getAssignmentManager(); + this.tableLockManager = masterServices.getTableLockManager(); int timeout = conf.getInt("hbase.client.catalog.timeout", 10000); // Need META availability to create a table @@ -126,8 +130,14 @@ public class CreateTableHandler extends EventHandler { @Override public void process() { String tableName = this.hTableDescriptor.getNameAsString(); + LOG.info("Attempting to create the table " + tableName); + + //acquire table lock before calling the coprocessors + + if (!tableLockManager.lockTableFailSilently(hTableDescriptor.getName(), "create")) { + return; + } try { - LOG.info("Attempting to create the table " + tableName); MasterCoprocessorHost cpHost = ((HMaster) this.server).getCoprocessorHost(); if (cpHost != null) { cpHost.preCreateTableHandler(this.hTableDescriptor, this.newRegions); @@ -140,6 +150,8 @@ public class CreateTableHandler extends EventHandler { LOG.error("Error trying to create the table " + tableName, e); } catch (KeeperException e) { LOG.error("Error trying to create the table " + tableName, e); + } finally { + tableLockManager.unlockTableFailSilently(hTableDescriptor.getName()); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java index 3a6b629..66a3c4c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.master.BulkAssigner; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.RegionStates; +import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.util.Bytes; import org.apache.zookeeper.KeeperException; import org.cloudera.htrace.Trace; @@ -50,15 +51,17 @@ public class DisableTableHandler extends EventHandler { private final byte [] tableName; private final String tableNameStr; private final AssignmentManager assignmentManager; + private final TableLockManager tableLockManager; public DisableTableHandler(Server server, byte [] tableName, CatalogTracker catalogTracker, AssignmentManager assignmentManager, - boolean skipTableStateCheck) + TableLockManager tableLockManager, boolean skipTableStateCheck) throws TableNotFoundException, TableNotEnabledException, IOException { super(server, EventType.C_M_DISABLE_TABLE); this.tableName = tableName; this.tableNameStr = Bytes.toString(this.tableName); this.assignmentManager = assignmentManager; + this.tableLockManager = tableLockManager; // Check if table exists // TODO: do we want to keep this in-memory as well? i guess this is // part of old master rewrite, schema to zk to check for table @@ -100,6 +103,9 @@ public class DisableTableHandler extends EventHandler { public void process() { try { LOG.info("Attempting to disable table " + this.tableNameStr); + if (!tableLockManager.lockTableFailSilently(tableName, "disable")) { + return; + } MasterCoprocessorHost cpHost = ((HMaster) this.server) .getCoprocessorHost(); if (cpHost != null) { @@ -113,6 +119,8 @@ public class DisableTableHandler extends EventHandler { LOG.error("Error trying to disable table " + this.tableNameStr, e); } catch (KeeperException e) { LOG.error("Error trying to disable table " + this.tableNameStr, e); + } finally { + tableLockManager.unlockTableFailSilently(tableName); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java index f46c870..b825535 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.zookeeper.KeeperException; @@ -55,18 +56,20 @@ public class EnableTableHandler extends EventHandler { private final byte [] tableName; private final String tableNameStr; private final AssignmentManager assignmentManager; + private final TableLockManager tableLockManager; private final CatalogTracker ct; private boolean retainAssignment = false; public EnableTableHandler(Server server, byte [] tableName, CatalogTracker catalogTracker, AssignmentManager assignmentManager, - boolean skipTableStateCheck) + TableLockManager tableLockManager, boolean skipTableStateCheck) throws TableNotFoundException, TableNotDisabledException, IOException { super(server, EventType.C_M_ENABLE_TABLE); this.tableName = tableName; this.tableNameStr = Bytes.toString(tableName); this.ct = catalogTracker; this.assignmentManager = assignmentManager; + this.tableLockManager = tableLockManager; this.retainAssignment = skipTableStateCheck; // Check if table exists if (!MetaReader.tableExists(catalogTracker, this.tableNameStr)) { @@ -106,6 +109,9 @@ public class EnableTableHandler extends EventHandler { public void process() { try { LOG.info("Attempting to enable the table " + this.tableNameStr); + if (!tableLockManager.lockTableFailSilently(tableName, "enable")) { + return; + } MasterCoprocessorHost cpHost = ((HMaster) this.server) .getCoprocessorHost(); if (cpHost != null) { @@ -121,6 +127,8 @@ public class EnableTableHandler extends EventHandler { LOG.error("Error trying to enable the table " + this.tableNameStr, e); } catch (InterruptedException e) { LOG.error("Error trying to enable the table " + this.tableNameStr, e); + } finally { + tableLockManager.unlockTableFailSilently(tableName); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java index c6f70e9..a2a06e5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java @@ -41,7 +41,7 @@ public class TableDeleteFamilyHandler extends TableEventHandler { public TableDeleteFamilyHandler(byte[] tableName, byte [] familyName, Server server, final MasterServices masterServices) throws IOException { - super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices); + super(EventType.C_M_DELETE_FAMILY, tableName, server, masterServices); HTableDescriptor htd = getTableDescriptor(); this.familyName = hasColumnFamily(htd, familyName); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java index aedfa96..c1badbf 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java @@ -93,6 +93,11 @@ public abstract class TableEventHandler extends EventHandler { try { LOG.info("Handling table operation " + eventType + " on table " + Bytes.toString(tableName)); + + if (!masterServices.getTableLockManager().lockTableFailSilently( + tableName, eventType.toString())) { + return; + } List hris = MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName); @@ -112,6 +117,7 @@ public abstract class TableEventHandler extends EventHandler { } catch (KeeperException e) { LOG.error("Error manipulating table " + Bytes.toString(tableName), e); } finally { + masterServices.getTableLockManager().unlockTableFailSilently(tableName); // notify the waiting thread that we're done persisting the request setPersist(); } @@ -141,7 +147,7 @@ public abstract class TableEventHandler extends EventHandler { reRegions.add(hri); serverToRegions.get(rsLocation).add(hri); } - + LOG.info("Reopening " + reRegions.size() + " regions on " + serverToRegions.size() + " region servers."); this.masterServices.getAssignmentManager().setRegionsToReopen(reRegions); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index 4b355f7..1dc528e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -43,24 +43,24 @@ import org.apache.zookeeper.data.Stat; /** * A zookeeper that can handle 'recoverable' errors. - * To handle recoverable errors, developers need to realize that there are two - * classes of requests: idempotent and non-idempotent requests. Read requests - * and unconditional sets and deletes are examples of idempotent requests, they - * can be reissued with the same results. - * (Although, the delete may throw a NoNodeException on reissue its effect on - * the ZooKeeper state is the same.) Non-idempotent requests need special - * handling, application and library writers need to keep in mind that they may - * need to encode information in the data or name of znodes to detect - * retries. A simple example is a create that uses a sequence flag. - * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection - * loss exception, that process will reissue another - * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a - * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be - * that x-109 was the result of the previous create, so the process actually - * owns both x-109 and x-111. An easy way around this is to use "x-process id-" + * To handle recoverable errors, developers need to realize that there are two + * classes of requests: idempotent and non-idempotent requests. Read requests + * and unconditional sets and deletes are examples of idempotent requests, they + * can be reissued with the same results. + * (Although, the delete may throw a NoNodeException on reissue its effect on + * the ZooKeeper state is the same.) Non-idempotent requests need special + * handling, application and library writers need to keep in mind that they may + * need to encode information in the data or name of znodes to detect + * retries. A simple example is a create that uses a sequence flag. + * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection + * loss exception, that process will reissue another + * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a + * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be + * that x-109 was the result of the previous create, so the process actually + * owns both x-109 and x-111. An easy way around this is to use "x-process id-" * when doing the create. If the process is using an id of 352, before reissuing - * the create it will do a getChildren("/") and see "x-222-1", "x-542-30", - * "x-352-109", x-333-110". The process will know that the original create + * the create it will do a getChildren("/") and see "x-222-1", "x-542-30", + * "x-352-109", x-333-110". The process will know that the original create * succeeded an the znode it created is "x-352-109". * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling" */ @@ -93,7 +93,7 @@ public class RecoverableZooKeeper { private static final int ID_LENGTH_SIZE = Bytes.SIZEOF_INT; public RecoverableZooKeeper(String quorumServers, int sessionTimeout, - Watcher watcher, int maxRetries, int retryIntervalMillis) + Watcher watcher, int maxRetries, int retryIntervalMillis) throws IOException { this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher); this.retryCounterFactory = @@ -337,7 +337,7 @@ public class RecoverableZooKeeper { /** * setData is NOT an idempotent operation. Retry may cause BadVersion Exception - * Adding an identifier field into the data to check whether + * Adding an identifier field into the data to check whether * badversion is caused by the result of previous correctly setData * @return Stat instance */ @@ -384,17 +384,17 @@ public class RecoverableZooKeeper { /** *

- * NONSEQUENTIAL create is idempotent operation. + * NONSEQUENTIAL create is idempotent operation. * Retry before throwing exceptions. * But this function will not throw the NodeExist exception back to the * application. *

*

- * But SEQUENTIAL is NOT idempotent operation. It is necessary to add - * identifier to the path to verify, whether the previous one is successful + * But SEQUENTIAL is NOT idempotent operation. It is necessary to add + * identifier to the path to verify, whether the previous one is successful * or not. *

- * + * * @return Path */ public String create(String path, byte[] data, List acl, @@ -411,12 +411,12 @@ public class RecoverableZooKeeper { return createSequential(path, newData, acl, createMode); default: - throw new IllegalArgumentException("Unrecognized CreateMode: " + + throw new IllegalArgumentException("Unrecognized CreateMode: " + createMode); } } - private String createNonSequential(String path, byte[] data, List acl, + private String createNonSequential(String path, byte[] data, List acl, CreateMode createMode) throws KeeperException, InterruptedException { RetryCounter retryCounter = retryCounterFactory.create(); boolean isRetry = false; // False for first attempt, true for all retries. @@ -429,14 +429,14 @@ public class RecoverableZooKeeper { if (isRetry) { // If the connection was lost, there is still a possibility that // we have successfully created the node at our previous attempt, - // so we read the node and compare. + // so we read the node and compare. byte[] currentData = zk.getData(path, false, null); if (currentData != null && - Bytes.compareTo(currentData, data) == 0) { + Bytes.compareTo(currentData, data) == 0) { // We successfully created a non-sequential node return path; } - LOG.error("Node " + path + " already exists with " + + LOG.error("Node " + path + " already exists with " + Bytes.toStringBinary(currentData) + ", could not write " + Bytes.toStringBinary(data)); throw e; @@ -460,8 +460,8 @@ public class RecoverableZooKeeper { isRetry = true; } } - - private String createSequential(String path, byte[] data, + + private String createSequential(String path, byte[] data, List acl, CreateMode createMode) throws KeeperException, InterruptedException { RetryCounter retryCounter = retryCounterFactory.create(); @@ -513,7 +513,7 @@ public class RecoverableZooKeeper { } return null; } - + public byte[] removeMetaData(byte[] data) { if(data == null || data.length == 0) { return data; @@ -582,7 +582,7 @@ public class RecoverableZooKeeper { * @param prefixes the prefixes to include in the result * @return list of every element that starts with one of the prefixes */ - private static List filterByPrefix(List nodes, + private static List filterByPrefix(List nodes, String... prefixes) { List lockChildren = new ArrayList(); for (String child : nodes){ @@ -595,4 +595,8 @@ public class RecoverableZooKeeper { } return lockChildren; } + + public String getIdentifier() { + return identifier; + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 1a80b58..9e55e6e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -249,6 +249,29 @@ public class ZKUtil { } /** + * Watch the specified znode, but only if exists. Useful when watching + * for deletions. Uses .getData() (and handles NoNodeException) instead + * of .exists() to accomplish this, as .getData() will only set a watch if + * the znode exists. + * @param znode + * @return + * @throws KeeperException + */ + public static boolean setWatchIfNodeExists(ZooKeeperWatcher zkw, String znode) + throws KeeperException { + try { + zkw.getRecoverableZooKeeper().getData(znode, true, null); + return true; + } catch (NoNodeException e) { + return false; + } catch (InterruptedException e) { + LOG.warn(zkw.prefix("Unable to set watcher on znode " + znode), e); + zkw.interruptedException(e); + return false; + } + } + + /** * Check if the specified node exists. Sets no watches. * * @param zkw zk reference @@ -832,6 +855,29 @@ public class ZKUtil { return true; } + public static String createZNodeIfNotExistsNoWatch(ZooKeeperWatcher zkw, String zNode, + byte[] data, CreateMode createMode) throws KeeperException { + + String createdZNode = null; + try { + waitForZKConnectionIfAuthenticating(zkw); + createdZNode = zkw.getRecoverableZooKeeper().create(zNode, data, + Ids.OPEN_ACL_UNSAFE, createMode); + } catch (KeeperException.NodeExistsException nee) { + try { + zkw.getRecoverableZooKeeper().exists(zNode, zkw); + } catch (InterruptedException e) { + zkw.interruptedException(e); + return null; + } + return null; + } catch (InterruptedException e) { + zkw.interruptedException(e); + return null; + } + return createdZNode != null ? createdZNode : zNode; + } + /** * Creates the specified node with the specified data and watches it. * diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index 128a0d9..7c187e5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -103,6 +103,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { public String splitLogZNode; // znode containing the state of the load balancer public String balancerZNode; + // znode containing the lock for the tables + public String tableLockZNode; // Certain ZooKeeper nodes need to be world-readable public static final ArrayList CREATOR_ALL_AND_WORLD_READABLE = @@ -166,6 +168,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { ZKUtil.createAndFailSilent(this, tableZNode); ZKUtil.createAndFailSilent(this, splitLogZNode); ZKUtil.createAndFailSilent(this, backupMasterAddressesZNode); + ZKUtil.createAndFailSilent(this, tableLockZNode); } catch (KeeperException e) { throw new ZooKeeperConnectionException( prefix("Unexpected KeeperException creating base node"), e); @@ -215,6 +218,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME)); balancerZNode = ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.balancer", "balancer")); + tableLockZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.tableLock", "tableLock")); } /** @@ -234,6 +239,10 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { listeners.add(0, listener); } + public void unregisterListener(ZooKeeperListener listener) { + listeners.remove(listener); + } + /** * Get the connection to ZooKeeper. * @return connection reference to zookeeper @@ -377,10 +386,10 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { if (this.abortable != null) this.abortable.abort(msg, new KeeperException.SessionExpiredException()); break; - + case ConnectedReadOnly: - break; - + break; + default: throw new IllegalStateException("Received event is not valid."); } @@ -459,7 +468,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { public void abort(String why, Throwable e) { this.abortable.abort(why, e); } - + @Override public boolean isAborted() { return this.abortable.isAborted(); @@ -471,4 +480,5 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { public String getMasterAddressZNode() { return this.masterAddressZNode; } + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/AcquiredLock.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/AcquiredLock.java new file mode 100644 index 0000000..8462f7c --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/AcquiredLock.java @@ -0,0 +1,55 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper.lock; + +/** + * Represents information about an acquired lock. Used by BaseDistributedLock. + */ +public class AcquiredLock { + + private final String path; + private final int version; + + /** + * Store information about a lock. + * @param path The path to a lock's ZNode + * @param version The current version of the lock's ZNode + */ + public AcquiredLock(String path, int version) { + this.path = path; + this.version = version; + } + + public String getPath() { + return path; + } + + public int getVersion() { + return version; + } + + @Override + public String toString() { + return "AcquiredLockInfo{" + + "path='" + path + '\'' + + ", version=" + version + + '}'; + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/BaseHLockImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/BaseHLockImpl.java new file mode 100644 index 0000000..af1cb60 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/BaseHLockImpl.java @@ -0,0 +1,270 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper.lock; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HLock; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.BadVersionException; +import org.apache.zookeeper.data.Stat; + +/** + * ZooKeeper based HLock implementation. Based on the Shared Locks recipe. + * (see: + * + * ZooKeeper Recipes and Solutions + * ) + */ +public abstract class BaseHLockImpl implements HLock { + + private static final Log LOG = LogFactory.getLog(BaseHLockImpl.class); + + /** ZNode prefix used by processes acquiring reader locks */ + protected static final String READ_LOCK_CHILD_NODE = "read-"; + + /** ZNode prefix used by processes acquiring writer locks */ + protected static final String WRITE_LOCK_CHILD_NODE = "write-"; + + protected final ZooKeeperWatcher zkWatcher; + protected final String parentLockNode; + protected final String fullyQualifiedZNode; + protected final byte[] metadata; + protected final MetadataHandler handler; + + // If we acquire a lock, update this field + protected final AtomicReference acquiredLock = + new AtomicReference(null); + + /** + * Called by implementing classes. + * @param zkWrapper + * @param parentLockNode The lock ZNode path + * @param metadata + * @param handler + * @param childNode The prefix for child nodes created under the parent + */ + protected BaseHLockImpl(ZooKeeperWatcher zkWatcher, + String parentLockNode, byte[] metadata, MetadataHandler handler, String childNode) { + this.zkWatcher = zkWatcher; + this.parentLockNode = parentLockNode; + this.fullyQualifiedZNode = ZKUtil.joinZNode(parentLockNode, childNode); + this.metadata = metadata; + this.handler = handler; + try { + ZKUtil.createWithParents(zkWatcher, parentLockNode); + } catch (KeeperException ex) { + LOG.warn("Failed to create znode:" + parentLockNode, ex); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void acquire() throws IOException, InterruptedException { + tryAcquire(-1); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean tryAcquire(long timeoutMs) + throws IOException, InterruptedException { + boolean hasTimeout = timeoutMs != -1; + long waitUntilMs = + hasTimeout ?EnvironmentEdgeManager.currentTimeMillis() + timeoutMs : -1; + String createdZNode = createLockZNode(); + while (true) { + List children; + try { + children = ZKUtil.listChildrenNoWatch(zkWatcher, parentLockNode); + } catch (KeeperException e) { + LOG.error("Unexpected ZooKeeper error when listing children", e); + throw new IOException("Unexpected ZooKeeper exception", e); + } + String pathToWatch; + if ((pathToWatch = getLockPath(createdZNode, children)) == null) { + break; + } else { + CountDownLatch deletedLatch = new CountDownLatch(1); + String zkPathToWatch = + ZKUtil.joinZNode(parentLockNode, pathToWatch); + DeletionListener deletionListener = + new DeletionListener(zkWatcher, zkPathToWatch, deletedLatch); + zkWatcher.registerListener(deletionListener); + try { + if (ZKUtil.setWatchIfNodeExists(zkWatcher, zkPathToWatch)) { + // Wait for the watcher to fire + if (hasTimeout) { + long remainingMs = waitUntilMs - EnvironmentEdgeManager.currentTimeMillis(); + if (remainingMs < 0 || + !deletedLatch.await(remainingMs, TimeUnit.MILLISECONDS)) { + LOG.warn("Unable to acquire the lock in " + timeoutMs + + " milliseconds."); + try { + ZKUtil.deleteNode(zkWatcher, createdZNode); + } catch (KeeperException e) { + LOG.warn("Unable to remove ZNode " + createdZNode); + } + return false; + } + } else { + deletedLatch.await(); + } + if (deletionListener.hasException()) { + Throwable t = deletionListener.getException(); + throw new IOException("Exception in the watcher", t); + } + } + } catch (KeeperException e) { + throw new IOException("Unexpected ZooKeeper exception", e); + } finally { + zkWatcher.unregisterListener(deletionListener); + } + } + } + updateAcquiredLock(createdZNode); + LOG.debug("Successfully acquired a lock for " + fullyQualifiedZNode); + return true; + } + + private String createLockZNode() { + try { + return ZKUtil.createZNodeIfNotExistsNoWatch(zkWatcher, fullyQualifiedZNode, + metadata, CreateMode.PERSISTENT_SEQUENTIAL); + } catch (KeeperException ex) { + LOG.warn("Failed to create znode: " + fullyQualifiedZNode, ex); + return null; + } + } + + /** + * Update state as to indicate that a lock is held + * @param createdZNode The lock znode + * @throws IOException If an unrecoverable ZooKeeper error occurs + */ + protected void updateAcquiredLock(String createdZNode) throws IOException { + Stat stat = new Stat(); + byte[] data = null; + try { + data = ZKUtil.getDataNoWatch(zkWatcher, createdZNode, stat); + } catch (KeeperException ex) { + LOG.warn("Cannot getData for znode:" + createdZNode, ex); + } + if (data == null) { + LOG.error("Can't acquire a lock on a non-existent node " + createdZNode); + throw new IllegalStateException("ZNode " + createdZNode + + "no longer exists!"); + } + AcquiredLock newLock = new AcquiredLock(createdZNode, stat.getVersion()); + if (!acquiredLock.compareAndSet(null, newLock)) { + LOG.error("The lock " + fullyQualifiedZNode + + " has already been acquired by another process!"); + throw new IllegalStateException(fullyQualifiedZNode + + " is held by another process"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void release() throws IOException, InterruptedException { + AcquiredLock lock = acquiredLock.get(); + if (lock == null) { + LOG.error("Cannot release " + lock + + ", process does not have a lock for " + fullyQualifiedZNode); + throw new IllegalStateException("No lock held for " + fullyQualifiedZNode); + } + try { + if (ZKUtil.checkExists(zkWatcher, lock.getPath()) != -1) { + ZKUtil.deleteNode(zkWatcher, lock.getPath(), lock.getVersion()); + if (!acquiredLock.compareAndSet(lock, null)) { + LOG.debug("Current process no longer holds " + lock + " for " + + fullyQualifiedZNode); + throw new IllegalStateException("Not holding a lock for " + + fullyQualifiedZNode +"!"); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Successfully released " + fullyQualifiedZNode); + } + } catch (BadVersionException e) { + throw new IllegalStateException(e); + } catch (KeeperException e) { + throw new IOException(e); + } + } + + /** + * Process metadata stored in a ZNode using a callback object passed to + * this instance. + *

+ * See {@link HWriteLockImpl.MetadataHandler#handleMetadata(byte[])} + * @param lockZNode The node holding the metadata + * @return True if metadata was ready and processed + * @throws IOException If an unexpected ZooKeeper error occurs + * @throws InterruptedException If interrupted when reading the metadata + */ + protected boolean handleLockMetadata(String lockZNode) + throws IOException, InterruptedException { + byte[] metadata = null; + try { + metadata = ZKUtil.getData(zkWatcher, lockZNode); + } catch (KeeperException ex) { + LOG.warn("Cannot getData for znode:" + lockZNode, ex); + } + if (metadata == null) { + return false; + } + if (handler != null) { + handler.handleMetadata(metadata); + } + return true; + } + + /** + * Determine based on a list of children under a ZNode, whether or not a + * process which created a specified ZNode has obtained a lock. If a lock is + * not obtained, return the path that we should watch awaiting its deletion. + * Otherwise, return null. + * This method is abstract as the logic for determining whether or not a + * lock is obtained depends on the type of lock being implemented. + * @param myZNode The ZNode created by the process attempting to acquire + * a lock + * @param children List of all child ZNodes under the lock's parent ZNode + * @return The path to watch, or null if myZNode can represent a correctly + * acquired lock. + */ + protected abstract String getLockPath(String myZNode, List children) + throws IOException, InterruptedException; +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/DeletionListener.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/DeletionListener.java new file mode 100644 index 0000000..0024642 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/DeletionListener.java @@ -0,0 +1,101 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper.lock; + +import java.util.concurrent.CountDownLatch; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + +/** + * A ZooKeeper watcher meant to detect deletions of ZNodes. + */ +public class DeletionListener extends ZooKeeperListener { + + private static final Log LOG = LogFactory.getLog(DeletionListener.class); + + private final String pathToWatch; + private final CountDownLatch deletedLatch; + + private volatile Throwable exception; + + /** + * Create a new instance of the deletion watcher. + * @param zkWrapper + * @param pathToWatch (Fully qualified) ZNode path that we are waiting to + * be deleted. + * @param deletedLatch Count down on this latch when deletion has occured. + */ + public DeletionListener(ZooKeeperWatcher zkWatcher, String pathToWatch, + CountDownLatch deletedLatch) { + super(zkWatcher); + this.pathToWatch = pathToWatch; + this.deletedLatch = deletedLatch; + exception = null; + } + + /** + * Check if an exception has occurred when re-setting the watch. + * @return True if we were unable to re-set a watch on a ZNode due to + * an exception. + */ + public boolean hasException() { + return exception != null; + } + + /** + * Get the last exception which has occurred when re-setting the watch. + * Use hasException() to check whether or not an exception has occurred. + * @return The last exception observed when re-setting the watch. + */ + public Throwable getException() { + return exception; + } + + @Override + public void nodeDataChanged(String path) { + if (!path.equals(pathToWatch)) { + return; + } + try { + if (!(ZKUtil.setWatchIfNodeExists(watcher, pathToWatch))) { + deletedLatch.countDown(); + } + } catch (Throwable t) { + exception = t; + deletedLatch.countDown(); + LOG.error("Error when re-setting the watch on " + pathToWatch, t); + } + } + + @Override + public void nodeDeleted(String path) { + if (!path.equals(pathToWatch)) { + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Processing delete on " + pathToWatch); + } + deletedLatch.countDown(); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/HReadLockImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/HReadLockImpl.java new file mode 100644 index 0000000..1843c7c --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/HReadLockImpl.java @@ -0,0 +1,89 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper.lock; + +import java.io.IOException; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + +/** + * ZooKeeper based read lock: does not exclude other read locks, but excludes + * and is excluded by write locks. + */ +public class HReadLockImpl extends BaseHLockImpl { + + private static final Log LOG = LogFactory.getLog(HReadLockImpl.class); + + /** Temporary until this is a constant in ZK */ + private static final char ZNODE_PATH_SEPARATOR = '/'; + + public HReadLockImpl(ZooKeeperWatcher zooKeeperWatcher, + String name, byte[] metadata, MetadataHandler handler) { + super(zooKeeperWatcher, name, metadata, handler, READ_LOCK_CHILD_NODE); + } + + /** + * Check if a child znode represents a write lock. + * @param child The child znode we want to check. + * @return + */ + private static boolean isWriteChild(String child) { + int idx = child.lastIndexOf(ZNODE_PATH_SEPARATOR); + String suffix = child.substring(idx + 1); + return suffix.startsWith(WRITE_LOCK_CHILD_NODE); + } + + /** + * {@inheritDoc} + */ + @Override + protected String getLockPath(String createdZNode, List children) + throws IOException, InterruptedException { + TreeSet writeChildren = + new TreeSet(new ZNodeComparator(zkWatcher.getRecoverableZooKeeper().getIdentifier())); + for (String child : children) { + if (isWriteChild(child)) { + writeChildren.add(child); + } + } + if (writeChildren.isEmpty()) { + return null; + } + SortedSet lowerChildren = writeChildren.headSet(createdZNode); + if (lowerChildren.isEmpty()) { + return null; + } + String pathToWatch = lowerChildren.last(); + String nodeHoldingLock = lowerChildren.first(); + String znode = ZKUtil.joinZNode(createdZNode, nodeHoldingLock); + try { + handleLockMetadata(znode); + } catch (IOException e) { + LOG.warn("Error processing lock metadata in " + nodeHoldingLock, e); + } + return pathToWatch; + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/HReadWriteLockImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/HReadWriteLockImpl.java new file mode 100644 index 0000000..a212c22 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/HReadWriteLockImpl.java @@ -0,0 +1,63 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper.lock; + +import org.apache.hadoop.hbase.HLock.MetadataHandler; +import org.apache.hadoop.hbase.HReadWriteLock; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + +/** + * ZooKeeper based implementation of HReadWriteLock. + */ +public class HReadWriteLockImpl implements HReadWriteLock { + + private final ZooKeeperWatcher zkWatcher; + private final String name; + private final MetadataHandler handler; + + /** + * Creates a DistributedReadWriteLock instance. + * @param zkWatcher + * @param znode ZNode path for the lock + * @param handler An object that will handle de-serializing and processing + * the metadata associated with reader or writer locks + * created by this object or null if none desired. + */ + public HReadWriteLockImpl(ZooKeeperWatcher zkWatcher, String znode, + MetadataHandler handler) { + this.zkWatcher = zkWatcher; + this.name = znode; + this.handler = handler; + } + + /** + * {@inheritDoc} + */ + public HReadLockImpl readLock(byte[] metadata) { + return new HReadLockImpl(zkWatcher, name, metadata, handler); + } + + /** + * {@inheritDoc} + */ + public HWriteLockImpl writeLock(byte[] metadata) { + return new HWriteLockImpl(zkWatcher, name, metadata, handler); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/HWriteLockImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/HWriteLockImpl.java new file mode 100644 index 0000000..4cfb469 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/HWriteLockImpl.java @@ -0,0 +1,64 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper.lock; + +import java.io.IOException; +import java.util.List; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + +/** + * ZooKeeper based write lock: + */ +public class HWriteLockImpl extends BaseHLockImpl { + + private static final Log LOG = LogFactory.getLog(HWriteLockImpl.class); + + public HWriteLockImpl(ZooKeeperWatcher zooKeeperWatcher, + String name, byte[] metadata, MetadataHandler handler) { + super(zooKeeperWatcher, name, metadata, handler, WRITE_LOCK_CHILD_NODE); + } + + /** + * {@inheritDoc} + */ + @Override + protected String getLockPath(String createdZNode, List children) + throws IOException, InterruptedException { + TreeSet sortedChildren = + new TreeSet(new ZNodeComparator(zkWatcher.getRecoverableZooKeeper().getIdentifier())); + sortedChildren.addAll(children); + String pathToWatch = sortedChildren.lower(createdZNode); + if (pathToWatch != null) { + String nodeHoldingLock = sortedChildren.first(); + String znode = ZKUtil.joinZNode(createdZNode, nodeHoldingLock); + try { + handleLockMetadata(znode); + } catch (IOException e) { + LOG.warn("Error processing lock metadata in " + nodeHoldingLock, e); + } + } + return pathToWatch; + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZNodeComparator.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZNodeComparator.java new file mode 100644 index 0000000..97b36b2 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZNodeComparator.java @@ -0,0 +1,44 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper.lock; + +import java.util.Comparator; + +public class ZNodeComparator implements Comparator { + + private final String identifier; + + public ZNodeComparator(String identifier) { + this.identifier = identifier; + } + + public static int getChildSequenceId(String childZNode, String identifier) { + int idx = childZNode.lastIndexOf(identifier); + String sequenceIdStr = childZNode.substring(idx + identifier.length()); + return Integer.parseInt(sequenceIdStr); + } + + @Override + public int compare(String zNode1, String zNode2) { + int seq1 = getChildSequenceId(zNode1, identifier); + int seq2 = getChildSequenceId(zNode2, identifier); + return seq1 - seq2; + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java hbase-server/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java index 8758f9e..f333a51 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java @@ -18,8 +18,11 @@ */ package org.apache.hadoop.hbase; -import java.util.Set; import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -91,7 +94,7 @@ public abstract class MultithreadedTestUtil { stopped = s; } } - + public void stop() throws Exception { synchronized (this) { stopped = true; @@ -130,7 +133,7 @@ public abstract class MultithreadedTestUtil { this.stopped = true; } } - + /** * A test thread that performs a repeating operation. */ @@ -138,13 +141,48 @@ public abstract class MultithreadedTestUtil { public RepeatingTestThread(TestContext ctx) { super(ctx); } - + public final void doWork() throws Exception { while (ctx.shouldRun() && !stopped) { doAnAction(); } } - + public abstract void doAnAction() throws Exception; } + + /** + * Verify that no assertions have failed inside a future. + * Used for unit tests that spawn threads. E.g., + *

+ * + * List> results = Lists.newArrayList(); + * Future f = executor.submit(new Callable { + * public Void call() { + * assertTrue(someMethod()); + * } + * }); + * results.add(f); + * assertOnFutures(results); + * + * @param threadResults A list of futures + * @param + * @throws InterruptedException If interrupted when waiting for a result + * from one of the futures + * @throws ExecutionException If an exception other than AssertionError + * occurs inside any of the futures + */ + public static void assertOnFutures(List> threadResults) + throws InterruptedException, ExecutionException { + for (Future threadResult : threadResults) { + try { + threadResult.get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof AssertionError) { + throw (AssertionError) e.getCause(); + } + throw e; + } + } + } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java index 7f3bc12..3d7bfbe 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager; import org.apache.hadoop.hbase.master.balancer.DefaultLoadBalancer; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.handler.EnableTableHandler; @@ -181,7 +182,7 @@ public class TestAssignmentManager { * @throws IOException * @throws KeeperException * @throws InterruptedException - * @throws DeserializationException + * @throws DeserializationException */ @Test(timeout = 5000) public void testBalanceOnMasterFailoverScenarioWithOpenedNode() @@ -339,7 +340,7 @@ public class TestAssignmentManager { * from one server to another mocking regionserver responding over zk. * @throws IOException * @throws KeeperException - * @throws DeserializationException + * @throws DeserializationException */ @Test public void testBalance() @@ -354,7 +355,7 @@ public class TestAssignmentManager { .getConfiguration()); // Create an AM. AssignmentManager am = new AssignmentManager(this.server, - this.serverManager, ct, balancer, executor, null); + this.serverManager, ct, balancer, executor, null, master.getTableLockManager()); am.failoverCleanupDone.set(true); try { // Make sure our new AM gets callbacks; once registered, can't unregister. @@ -435,7 +436,7 @@ public class TestAssignmentManager { * To test closed region handler to remove rit and delete corresponding znode * if region in pending close or closing while processing shutdown of a region * server.(HBASE-5927). - * + * * @throws KeeperException * @throws IOException * @throws ServiceException @@ -446,12 +447,12 @@ public class TestAssignmentManager { testCaseWithPartiallyDisabledState(Table.State.DISABLING); testCaseWithPartiallyDisabledState(Table.State.DISABLED); } - - + + /** * To test if the split region is removed from RIT if the region was in SPLITTING state but the RS * has actually completed the splitting in META but went down. See HBASE-6070 and also HBASE-5806 - * + * * @throws KeeperException * @throws IOException */ @@ -462,7 +463,7 @@ public class TestAssignmentManager { // false indicate the region is not split testCaseWithSplitRegionPartial(false); } - + private void testCaseWithSplitRegionPartial(boolean regionSplitDone) throws KeeperException, IOException, NodeExistsException, InterruptedException, ServiceException { // Create and startup an executor. This is used by AssignmentManager @@ -523,7 +524,7 @@ public class TestAssignmentManager { // Create an AM. AssignmentManager am = new AssignmentManager(this.server, - this.serverManager, ct, balancer, executor, null); + this.serverManager, ct, balancer, executor, null, master.getTableLockManager()); // adding region to regions and servers maps. am.regionOnline(REGIONINFO, SERVERNAME_A); // adding region in pending close. @@ -644,7 +645,7 @@ public class TestAssignmentManager { .getConfiguration()); // Create an AM. AssignmentManager am = new AssignmentManager(this.server, - this.serverManager, ct, balancer, null, null); + this.serverManager, ct, balancer, null, null, master.getTableLockManager()); try { // First make sure my mock up basically works. Unassign a region. unassign(am, SERVERNAME_A, hri); @@ -672,7 +673,7 @@ public class TestAssignmentManager { * Tests the processDeadServersAndRegionsInTransition should not fail with NPE * when it failed to get the children. Let's abort the system in this * situation - * @throws ServiceException + * @throws ServiceException */ @Test(timeout = 5000) public void testProcessDeadServersAndRegionsInTransitionShouldNotFailWithNPE() @@ -701,7 +702,7 @@ public class TestAssignmentManager { } } /** - * TestCase verifies that the regionPlan is updated whenever a region fails to open + * TestCase verifies that the regionPlan is updated whenever a region fails to open * and the master tries to process RS_ZK_FAILED_OPEN state.(HBASE-5546). */ @Test(timeout = 5000) @@ -788,7 +789,7 @@ public class TestAssignmentManager { this.gate.set(true); return randomServerName; } - + @Override public Map> retainAssignment( Map regions, List servers) { @@ -829,7 +830,7 @@ public class TestAssignmentManager { /** * Test verifies whether assignment is skipped for regions of tables in DISABLING state during * clean cluster startup. See HBASE-6281. - * + * * @throws KeeperException * @throws IOException * @throws Exception @@ -875,7 +876,7 @@ public class TestAssignmentManager { /** * Test verifies whether all the enabling table regions assigned only once during master startup. - * + * * @throws KeeperException * @throws IOException * @throws Exception @@ -895,7 +896,8 @@ public class TestAssignmentManager { try { // set table in enabling state. am.getZKTable().setEnablingTable(REGIONINFO.getTableNameAsString()); - new EnableTableHandler(server, REGIONINFO.getTableName(), am.getCatalogTracker(), am, true) + new EnableTableHandler(server, REGIONINFO.getTableName(), am.getCatalogTracker(), + am, new NullTableLockManager(), true) .process(); assertEquals("Number of assignments should be 1.", 1, assignmentCount); assertTrue("Table should be enabled.", @@ -1038,7 +1040,7 @@ public class TestAssignmentManager { ExecutorService executor = startupMasterExecutor("mockedAMExecutor"); this.balancer = LoadBalancerFactory.getLoadBalancer(server.getConfiguration()); AssignmentManagerWithExtrasForTesting am = new AssignmentManagerWithExtrasForTesting( - server, manager, ct, this.balancer, executor); + server, manager, ct, this.balancer, executor, new NullTableLockManager()); return am; } @@ -1057,8 +1059,9 @@ public class TestAssignmentManager { public AssignmentManagerWithExtrasForTesting( final Server master, final ServerManager serverManager, final CatalogTracker catalogTracker, final LoadBalancer balancer, - final ExecutorService service) throws KeeperException, IOException { - super(master, serverManager, catalogTracker, balancer, service, null); + final ExecutorService service, final TableLockManager tableLockManager) + throws KeeperException, IOException { + super(master, serverManager, catalogTracker, balancer, service, null, tableLockManager); this.es = service; this.ct = catalogTracker; } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java index 2d17820..c3b3a79 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java @@ -30,7 +30,6 @@ import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; -import com.google.protobuf.Service; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -73,6 +72,7 @@ import org.junit.experimental.categories.Category; import org.mockito.Mockito; import com.google.protobuf.RpcController; +import com.google.protobuf.Service; import com.google.protobuf.ServiceException; @Category(SmallTests.class) @@ -229,6 +229,11 @@ public class TestCatalogJanitor { } @Override + public TableLockManager getTableLockManager() { + return null; + } + + @Override public void abort(String why, Throwable e) { //no-op } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSchemaModificationLocks.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSchemaModificationLocks.java new file mode 100644 index 0000000..fd5b612 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSchemaModificationLocks.java @@ -0,0 +1,214 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableNotDisabledException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.After; +import org.junit.Test; + +public class TestSchemaModificationLocks { + + private static final Log LOG = + LogFactory.getLog(TestSchemaModificationLocks.class); + + private static final byte[] TABLE_NAME = Bytes.toBytes("TestTableLevelLocks"); + + private static final byte[] FAMILY = Bytes.toBytes("f1"); + + private static final byte[] NEW_FAMILY = Bytes.toBytes("f2"); + + private final HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + + private static final CountDownLatch deleteColumn = new CountDownLatch(1); + private static final CountDownLatch addColumn = new CountDownLatch(1); + + public void prepareMiniCluster() throws Exception { + TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true); + TEST_UTIL.startMiniCluster(2); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test(timeout = 60000) + public void testLockTimeoutException() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(HConstants.TABLE_LOCK_TIMEOUT_MS, 3000); + prepareMiniCluster(); + HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); + master.getCoprocessorHost().load(TestLockTimeoutExceptionMasterObserver.class, + 0, TEST_UTIL.getConfiguration()); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future shouldFinish = executor.submit(new Callable() { + @Override + public Object call() throws Exception { + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + admin.deleteColumn(TABLE_NAME, FAMILY); + return null; + } + }); + + deleteColumn.await(); + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + admin.addColumn(TABLE_NAME, new HColumnDescriptor(NEW_FAMILY)); + + //this request will timeout, but not in rpc, it is an async call + shouldFinish.get(); + } + + public static class TestLockTimeoutExceptionMasterObserver extends BaseMasterObserver { + @Override + public void preDeleteColumnHandler(ObserverContext ctx, + byte[] tableName, byte[] c) throws IOException { + deleteColumn.countDown(); + } + @Override + public void postDeleteColumnHandler(ObserverContext ctx, + byte[] tableName, byte[] c) throws IOException { + Threads.sleep(10000); + } + + @Override + public void preAddColumnHandler(ObserverContext ctx, + byte[] tableName, HColumnDescriptor column) throws IOException { + fail("Add column should have timeouted out for acquiring the table lock"); + } + } + + @Test(timeout = 60000) + public void testAlterAndDisable() throws Exception { + prepareMiniCluster(); + // Send a request to alter a table, then sleep during + // the alteration phase. In the mean time, from another + // thread, send a request to disable, and then delete a table. + + HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); + master.getCoprocessorHost().load(TestAlterAndDisableMasterObserver.class, + 0, TEST_UTIL.getConfiguration()); + + ExecutorService executor = Executors.newFixedThreadPool(2); + Future alterTableFuture = executor.submit(new Callable() { + @Override + public Object call() throws Exception { + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + admin.addColumn(TABLE_NAME, new HColumnDescriptor(NEW_FAMILY)); + LOG.info("Added new column family"); + HTableDescriptor tableDesc = admin.getTableDescriptor(TABLE_NAME); + assertTrue(tableDesc.getFamiliesKeys().contains(NEW_FAMILY)); + return null; + } + }); + Future disableTableFuture = executor.submit(new Callable() { + @Override + public Object call() throws Exception { + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + admin.disableTable(TABLE_NAME); + assertTrue(admin.isTableDisabled(TABLE_NAME)); + admin.deleteTable(TABLE_NAME); + assertFalse(admin.tableExists(TABLE_NAME)); + return null; + } + }); + + try { + disableTableFuture.get(); + alterTableFuture.get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof AssertionError) { + throw (AssertionError) e.getCause(); + } + throw e; + } + } + + public static class TestAlterAndDisableMasterObserver extends BaseMasterObserver { + @Override + public void preAddColumnHandler(ObserverContext ctx, + byte[] tableName, HColumnDescriptor column) throws IOException { + LOG.debug("addColumn called"); + addColumn.countDown(); + } + + @Override + public void postAddColumnHandler(ObserverContext ctx, + byte[] tableName, HColumnDescriptor column) throws IOException { + Threads.sleep(6000); + try { + ctx.getEnvironment().getMasterServices().checkTableModifiable(tableName); + } catch(TableNotDisabledException expected) { + //pass + return; + } catch(IOException ex) { + } + fail("was expecting the table to be enabled"); + } + + @Override + public void preDisableTable(ObserverContext ctx, + byte[] tableName) throws IOException { + try { + LOG.debug("Waiting for addColumn to be processed first"); + //wait for addColumn to be processed first + addColumn.await(); + LOG.debug("addColumn started, we can continue"); + } catch (InterruptedException ex) { + LOG.warn("Sleep interrupted while waiting for addColumn countdown"); + } + } + + @Override + public void postDisableTableHandler(ObserverContext ctx, + byte[] tableName) throws IOException { + Threads.sleep(3000); + } + } + + +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/lock/TestHReadWriteLockImpl.java hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/lock/TestHReadWriteLockImpl.java new file mode 100644 index 0000000..c26dd16 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/lock/TestHReadWriteLockImpl.java @@ -0,0 +1,321 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper.lock; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DaemonThreadFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HLock.MetadataHandler; +import org.apache.hadoop.hbase.MultithreadedTestUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class TestHReadWriteLockImpl { + + private static final Log LOG = + LogFactory.getLog(TestHReadWriteLockImpl.class); + + private static final HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + + private static final int NUM_THREADS = 10; + + private static Configuration conf; + + private final AtomicBoolean isLockHeld = new AtomicBoolean(false); + private final ExecutorService executor = + Executors.newFixedThreadPool(NUM_THREADS, + new DaemonThreadFactory("TestDistributedReadWriteLock-")); + + @BeforeClass + public static void beforeAllTests() throws Exception { + conf = TEST_UTIL.getConfiguration(); + TEST_UTIL.startMiniZKCluster(); + conf.setInt(HConstants.ZOOKEEPER_SESSION_TIMEOUT, 1000); + ZooKeeperWatcher zkw = getZooKeeperWatcher("setup"); + ZKUtil.createWithParents(zkw, zkw.tableLockZNode); + } + + @AfterClass + public static void afterAllTests() throws Exception { + TEST_UTIL.shutdownMiniZKCluster(); + } + + @After + public void tearDown() { + executor.shutdown(); + } + + private static ZooKeeperWatcher getZooKeeperWatcher(String desc) + throws IOException { + return TEST_UTIL.getZooKeeperWatcher(); + } + + + @Test(timeout = 30000) + public void testWriteLockExcludesWriters() throws Exception { + final String testName = "testWriteLockExcludesWriters"; + final HReadWriteLockImpl readWriteLock = + getReadWriteLock(testName); + List> results = Lists.newArrayList(); + for (int i = 0; i < NUM_THREADS; ++i) { + final String threadDesc = testName + i; + results.add(executor.submit(new Callable() { + @Override + public Void call() throws IOException { + HWriteLockImpl writeLock = + readWriteLock.writeLock(Bytes.toBytes(threadDesc)); + try { + writeLock.acquire(); + try { + // No one else should hold the lock + assertTrue(isLockHeld.compareAndSet(false, true)); + Thread.sleep(1000); + // No one else should have released the lock + assertTrue(isLockHeld.compareAndSet(true, false)); + } finally { + isLockHeld.set(false); + writeLock.release(); + } + } catch (InterruptedException e) { + LOG.warn(threadDesc + " interrupted", e); + Thread.currentThread().interrupt(); + throw new InterruptedIOException(); + } + return null; + } + })); + + } + MultithreadedTestUtil.assertOnFutures(results); + } + + @Test(timeout = 30000) + public void testReadLockDoesNotExcludeReaders() throws Exception { + final String testName = "testReadLockDoesNotExcludeReaders"; + final HReadWriteLockImpl readWriteLock = + getReadWriteLock(testName); + final CountDownLatch locksAcquiredLatch = new CountDownLatch(NUM_THREADS); + final AtomicInteger locksHeld = new AtomicInteger(0); + List> results = Lists.newArrayList(); + for (int i = 0; i < NUM_THREADS; ++i) { + final String threadDesc = testName + i; + results.add(executor.submit(new Callable() { + @Override + public Void call() throws Exception { + HReadLockImpl readLock = + readWriteLock.readLock(Bytes.toBytes(threadDesc)); + readLock.acquire(); + try { + locksHeld.incrementAndGet(); + locksAcquiredLatch.countDown(); + Thread.sleep(1000); + } finally { + readLock.release(); + locksHeld.decrementAndGet(); + } + return null; + } + })); + } + locksAcquiredLatch.await(); + assertEquals(locksHeld.get(), NUM_THREADS); + MultithreadedTestUtil.assertOnFutures(results); + } + + @Test(timeout = 3000) + public void testReadLockExcludesWriters() throws Exception { + // Submit a read lock request first + // Submit a write lock request second + final String testName = "testReadLockExcludesWriters"; + List> results = Lists.newArrayList(); + final CountDownLatch readLockAcquiredLatch = new CountDownLatch(1); + Callable acquireReadLock = new Callable() { + @Override + public Void call() throws Exception { + final String threadDesc = testName + "-acquireReadLock"; + HReadLockImpl readLock = + getReadWriteLock(testName).readLock(Bytes.toBytes(threadDesc)); + readLock.acquire(); + try { + assertTrue(isLockHeld.compareAndSet(false, true)); + readLockAcquiredLatch.countDown(); + Thread.sleep(1000); + } finally { + isLockHeld.set(false); + readLock.release(); + } + return null; + } + }; + Callable acquireWriteLock = new Callable() { + @Override + public Void call() throws Exception { + final String threadDesc = testName + "-acquireWriteLock"; + HWriteLockImpl writeLock = + getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc)); + readLockAcquiredLatch.await(); + assertTrue(isLockHeld.get()); + writeLock.acquire(); + try { + assertFalse(isLockHeld.get()); + } finally { + writeLock.release(); + } + return null; + } + }; + results.add(executor.submit(acquireReadLock)); + results.add(executor.submit(acquireWriteLock)); + MultithreadedTestUtil.assertOnFutures(results); + } + + private static HReadWriteLockImpl getReadWriteLock(String testName) + throws IOException { + MetadataHandler handler = new MetadataHandler() { + @Override + public void handleMetadata(byte[] ownerMetadata) { + LOG.info("Lock info: " + Bytes.toString(ownerMetadata)); + } + }; + ZooKeeperWatcher zkWatcher = getZooKeeperWatcher(testName); + String znode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, testName); + + return new HReadWriteLockImpl(zkWatcher, znode, handler); + } + + @Test(timeout = 30000) + public void testWriteLockExcludesReaders() throws Exception { + // Submit a read lock request first + // Submit a write lock request second + final String testName = "testReadLockExcludesWriters"; + List> results = Lists.newArrayList(); + final CountDownLatch writeLockAcquiredLatch = new CountDownLatch(1); + Callable acquireWriteLock = new Callable() { + @Override + public Void call() throws Exception { + final String threadDesc = testName + "-acquireWriteLock"; + HWriteLockImpl writeLock = + getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc)); + writeLock.acquire(); + try { + writeLockAcquiredLatch.countDown(); + assertTrue(isLockHeld.compareAndSet(false, true)); + Thread.sleep(1000); + } finally { + isLockHeld.set(false); + writeLock.release(); + } + return null; + } + }; + Callable acquireReadLock = new Callable() { + @Override + public Void call() throws Exception { + final String threadDesc = testName + "-acquireReadLock"; + HReadLockImpl readLock = + getReadWriteLock(testName).readLock(Bytes.toBytes(threadDesc)); + writeLockAcquiredLatch.await(); + readLock.acquire(); + try { + assertFalse(isLockHeld.get()); + } finally { + readLock.release(); + } + return null; + } + }; + results.add(executor.submit(acquireWriteLock)); + results.add(executor.submit(acquireReadLock)); + MultithreadedTestUtil.assertOnFutures(results); + } + + @Test(timeout = 60000) + public void testTimeout() throws Exception { + final String testName = "testTimeout"; + final CountDownLatch lockAcquiredLatch = new CountDownLatch(1); + Callable shouldHog = new Callable() { + @Override + public Void call() throws Exception { + final String threadDesc = testName + "-shouldHog"; + HWriteLockImpl lock = + getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc)); + lock.acquire(); + lockAcquiredLatch.countDown(); + Thread.sleep(10000); + lock.release(); + return null; + } + }; + Callable shouldTimeout = new Callable() { + @Override + public Void call() throws Exception { + final String threadDesc = testName + "-shouldTimeout"; + HWriteLockImpl lock = + getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc)); + lockAcquiredLatch.await(); + assertFalse(lock.tryAcquire(5000)); + return null; + } + }; + Callable shouldAcquireLock = new Callable() { + @Override + public Void call() throws Exception { + final String threadDesc = testName + "-shouldAcquireLock"; + HWriteLockImpl lock = + getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc)); + lockAcquiredLatch.await(); + assertTrue(lock.tryAcquire(30000)); + lock.release(); + return null; + } + }; + List> results = Lists.newArrayList(); + results.add(executor.submit(shouldHog)); + results.add(executor.submit(shouldTimeout)); + results.add(executor.submit(shouldAcquireLock)); + MultithreadedTestUtil.assertOnFutures(results); + } +}