Index: hbase-protocol/src/main/protobuf/ZooKeeper.proto =================================================================== --- hbase-protocol/src/main/protobuf/ZooKeeper.proto (revision 1467393) +++ hbase-protocol/src/main/protobuf/ZooKeeper.proto (working copy) @@ -143,4 +143,5 @@ optional int64 threadId = 3; optional bool isShared = 4; optional string purpose = 5; + optional int64 createTime = 6; } Index: hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java =================================================================== --- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java (revision 1467393) +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java (working copy) @@ -4925,6 +4925,10 @@ // optional string purpose = 5; boolean hasPurpose(); String getPurpose(); + + // optional int64 createTime = 6; + boolean hasCreateTime(); + long getCreateTime(); } public static final class TableLock extends com.google.protobuf.GeneratedMessage @@ -5030,12 +5034,23 @@ } } + // optional int64 createTime = 6; + public static final int CREATETIME_FIELD_NUMBER = 6; + private long createTime_; + public boolean hasCreateTime() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + public long getCreateTime() { + return createTime_; + } + private void initFields() { tableName_ = com.google.protobuf.ByteString.EMPTY; lockOwner_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); threadId_ = 0L; isShared_ = false; purpose_ = ""; + createTime_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -5070,6 +5085,9 @@ if (((bitField0_ & 0x00000010) == 0x00000010)) { output.writeBytes(5, getPurposeBytes()); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + output.writeInt64(6, createTime_); + } getUnknownFields().writeTo(output); } @@ -5099,6 +5117,10 @@ size += com.google.protobuf.CodedOutputStream .computeBytesSize(5, getPurposeBytes()); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(6, createTime_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -5147,6 +5169,11 @@ result = result && getPurpose() .equals(other.getPurpose()); } + result = result && (hasCreateTime() == other.hasCreateTime()); + if (hasCreateTime()) { + result = result && (getCreateTime() + == other.getCreateTime()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -5176,6 +5203,10 @@ hash = (37 * hash) + PURPOSE_FIELD_NUMBER; hash = (53 * hash) + getPurpose().hashCode(); } + if (hasCreateTime()) { + hash = (37 * hash) + CREATETIME_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getCreateTime()); + } hash = (29 * hash) + getUnknownFields().hashCode(); return hash; } @@ -5307,6 +5338,8 @@ bitField0_ = (bitField0_ & ~0x00000008); purpose_ = ""; bitField0_ = (bitField0_ & ~0x00000010); + createTime_ = 0L; + bitField0_ = (bitField0_ & ~0x00000020); return this; } @@ -5369,6 +5402,10 @@ to_bitField0_ |= 0x00000010; } result.purpose_ = purpose_; + if (((from_bitField0_ & 0x00000020) == 0x00000020)) { + to_bitField0_ |= 0x00000020; + } + result.createTime_ = createTime_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -5400,6 +5437,9 @@ if (other.hasPurpose()) { setPurpose(other.getPurpose()); } + if (other.hasCreateTime()) { + setCreateTime(other.getCreateTime()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -5466,6 +5506,11 @@ purpose_ = input.readBytes(); break; } + case 48: { + bitField0_ |= 0x00000020; + createTime_ = input.readInt64(); + break; + } } } } @@ -5664,6 +5709,27 @@ onChanged(); } + // optional int64 createTime = 6; + private long createTime_ ; + public boolean hasCreateTime() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + public long getCreateTime() { + return createTime_; + } + public Builder setCreateTime(long value) { + bitField0_ |= 0x00000020; + createTime_ = value; + onChanged(); + return this; + } + public Builder clearCreateTime() { + bitField0_ = (bitField0_ & ~0x00000020); + createTime_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:TableLock) } @@ -5758,11 +5824,12 @@ "tate.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" + "BLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n\010pos" + "ition\030\001 \002(\003\"$\n\017ReplicationLock\022\021\n\tlockOw" + - "ner\030\001 \002(\t\"s\n\tTableLock\022\021\n\ttableName\030\001 \001(", - "\014\022\036\n\tlockOwner\030\002 \001(\0132\013.ServerName\022\020\n\010thr" + - "eadId\030\003 \001(\003\022\020\n\010isShared\030\004 \001(\010\022\017\n\007purpose" + - "\030\005 \001(\tBE\n*org.apache.hadoop.hbase.protob" + - "uf.generatedB\017ZooKeeperProtosH\001\210\001\001\240\001\001" + "ner\030\001 \002(\t\"\207\001\n\tTableLock\022\021\n\ttableName\030\001 \001", + "(\014\022\036\n\tlockOwner\030\002 \001(\0132\013.ServerName\022\020\n\010th" + + "readId\030\003 \001(\003\022\020\n\010isShared\030\004 \001(\010\022\017\n\007purpos" + + "e\030\005 \001(\t\022\022\n\ncreateTime\030\006 \001(\003BE\n*org.apach" + + "e.hadoop.hbase.protobuf.generatedB\017ZooKe" + + "eperProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -5854,7 +5921,7 @@ internal_static_TableLock_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_TableLock_descriptor, - new java.lang.String[] { "TableName", "LockOwner", "ThreadId", "IsShared", "Purpose", }, + new java.lang.String[] { "TableName", "LockOwner", "ThreadId", "IsShared", "Purpose", "CreateTime", }, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.class, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.Builder.class); return null; Index: hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java (revision 1467393) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java (working copy) @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SynchronousQueue; @@ -73,6 +74,8 @@ import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.RegionStates; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; @@ -1299,7 +1302,7 @@ // TODO: fixHdfsHoles does not work against splits, since the parent dir lingers on // for some time until children references are deleted. HBCK erroneously sees this as // overlapping regions - HBaseFsck hbck = doFsck(conf, true, true, false, false, false, true, true, true, false, null); + HBaseFsck hbck = doFsck(conf, true, true, false, false, false, true, true, true, false, false, null); assertErrors(hbck, new ERROR_CODE[] {}); //no LINGERING_SPLIT_PARENT reported // assert that the split META entry is still there. @@ -1361,7 +1364,7 @@ ERROR_CODE.NOT_IN_META_OR_DEPLOYED, ERROR_CODE.HOLE_IN_REGION_CHAIN}); //no LINGERING_SPLIT_PARENT // now fix it. The fix should not revert the region split, but add daughters to META - hbck = doFsck(conf, true, true, false, false, false, false, false, false, false, null); + hbck = doFsck(conf, true, true, false, false, false, false, false, false, false, false, null); assertErrors(hbck, new ERROR_CODE[] {ERROR_CODE.NOT_IN_META_OR_DEPLOYED, ERROR_CODE.NOT_IN_META_OR_DEPLOYED, ERROR_CODE.HOLE_IN_REGION_CHAIN}); @@ -1936,6 +1939,71 @@ } } + @Test(timeout=30000) + public void testCheckTableLocks() throws Exception { + IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(0); + EnvironmentEdgeManager.injectEdge(edge); + // check no errors + HBaseFsck hbck = doFsck(conf, false); + assertNoErrors(hbck); + + ServerName mockName = new ServerName("localhost", 60000, 1); + + // obtain one lock + final TableLockManager tableLockManager = TableLockManager.createTableLockManager(conf, TEST_UTIL.getZooKeeperWatcher(), mockName); + TableLock writeLock = tableLockManager.writeLock(Bytes.toBytes("foo"), "testCheckTableLocks"); + writeLock.acquire(); + hbck = doFsck(conf, false); + assertNoErrors(hbck); // should not have expired, no problems + + edge.incrementTime(conf.getLong(TableLockManager.TABLE_LOCK_EXPIRE_TIMEOUT, + TableLockManager.DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS)); // let table lock expire + + hbck = doFsck(conf, false); + assertErrors(hbck, new ERROR_CODE[] {ERROR_CODE.EXPIRED_TABLE_LOCK}); + + final CountDownLatch latch = new CountDownLatch(1); + new Thread() { + public void run() { + TableLock readLock = tableLockManager.writeLock(Bytes.toBytes("foo"), "testCheckTableLocks"); + try { + latch.countDown(); + readLock.acquire(); + } catch (IOException ex) { + fail(); + } catch (IllegalStateException ex) { + return; // expected, since this will be reaped under us. + } + fail("should not have come here"); + }; + }.start(); + + latch.await(); // wait until thread starts + Threads.sleep(300); // wait some more to ensure writeLock.acquire() is called + + hbck = doFsck(conf, false); + assertErrors(hbck, new ERROR_CODE[] {ERROR_CODE.EXPIRED_TABLE_LOCK}); // still one expired, one not-expired + + edge.incrementTime(conf.getLong(TableLockManager.TABLE_LOCK_EXPIRE_TIMEOUT, + TableLockManager.DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS)); // let table lock expire + + hbck = doFsck(conf, false); + assertErrors(hbck, new ERROR_CODE[] {ERROR_CODE.EXPIRED_TABLE_LOCK, ERROR_CODE.EXPIRED_TABLE_LOCK}); // both are expired + + conf.setLong(TableLockManager.TABLE_LOCK_EXPIRE_TIMEOUT, 1); // reaping from ZKInterProcessWriteLock uses znode cTime, + // which is not injectable through EnvironmentEdge + Threads.sleep(10); + hbck = doFsck(conf, true); // now fix both cases + + hbck = doFsck(conf, false); + assertNoErrors(hbck); + + // ensure that locks are deleted + writeLock = tableLockManager.writeLock(Bytes.toBytes("foo"), "should acquire without blocking"); + writeLock.acquire(); // this should not block. + writeLock.release(); // release for clean state + } + @org.junit.Rule public TestName name = new TestName(); } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java (revision 1467393) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java (working copy) @@ -38,13 +38,13 @@ public static HBaseFsck doFsck( Configuration conf, boolean fix, String table) throws Exception { - return doFsck(conf, fix, fix, fix, fix,fix, fix, fix, fix, fix, table); + return doFsck(conf, fix, fix, fix, fix,fix, fix, fix, fix, fix, fix, table); } public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments, boolean fixMeta, boolean fixHdfsHoles, boolean fixHdfsOverlaps, boolean fixHdfsOrphans, boolean fixTableOrphans, boolean fixVersionFile, - boolean fixReferenceFiles, boolean fixEmptyMetaRegionInfo, String table) throws Exception { + boolean fixReferenceFiles, boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, String table) throws Exception { HBaseFsck fsck = new HBaseFsck(conf, exec); fsck.connect(); fsck.setDisplayFullReport(); // i.e. -details @@ -58,6 +58,7 @@ fsck.setFixVersionFile(fixVersionFile); fsck.setFixReferenceFiles(fixReferenceFiles); fsck.setFixEmptyMetaCells(fixEmptyMetaRegionInfo); + fsck.setFixTableLocks(fixTableLocks); if (table != null) { fsck.includeTable(table); } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java (revision 1467393) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java (working copy) @@ -287,7 +287,7 @@ writeLocksAttempted.await(); //now reap all table locks - lockManager.reapAllTableWriteLocks(); + lockManager.reapWriteLocks(); TEST_UTIL.getConfiguration().setInt(TableLockManager.TABLE_WRITE_LOCK_TIMEOUT_MS, 0); TableLockManager zeroTimeoutLockManager = TableLockManager.createTableLockManager( Index: hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessLockBase.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessLockBase.java (revision 1467393) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessLockBase.java (working copy) @@ -62,6 +62,7 @@ protected final ZooKeeperWatcher zkWatcher; protected final String parentLockNode; protected final String fullyQualifiedZNode; + protected final String childZNode; protected final byte[] metadata; protected final MetadataHandler handler; @@ -113,18 +114,22 @@ /** Parses sequenceId from the znode name. Zookeeper documentation * states: The sequence number is always fixed length of 10 digits, 0 padded */ - public static int getChildSequenceId(String childZNode) { + public static long getChildSequenceId(String childZNode) { Preconditions.checkNotNull(childZNode); assert childZNode.length() >= 10; String sequenceIdStr = childZNode.substring(childZNode.length() - 10); - return Integer.parseInt(sequenceIdStr); + return Long.parseLong(sequenceIdStr); } @Override public int compare(String zNode1, String zNode2) { - int seq1 = getChildSequenceId(zNode1); - int seq2 = getChildSequenceId(zNode2); - return seq1 - seq2; + long seq1 = getChildSequenceId(zNode1); + long seq2 = getChildSequenceId(zNode2); + if (seq1 == seq2) { + return 0; + } else { + return seq1 < seq2 ? -1 : 1; + } } } @@ -143,6 +148,7 @@ this.fullyQualifiedZNode = ZKUtil.joinZNode(parentLockNode, childNode); this.metadata = metadata; this.handler = handler; + this.childZNode = childNode; } /** @@ -233,6 +239,17 @@ } /** + * Check if a child znode represents a read lock. + * @param child The child znode we want to check. + * @return whether the child znode represents a read lock + */ + protected static boolean isChildReadLock(String child) { + int idx = child.lastIndexOf(ZKUtil.ZNODE_PATH_SEPARATOR); + String suffix = child.substring(idx + 1); + return suffix.startsWith(WRITE_LOCK_CHILD_NODE_PREFIX); + } + + /** * Check if a child znode represents a write lock. * @param child The child znode we want to check. * @return whether the child znode represents a write lock @@ -244,6 +261,17 @@ } /** + * Check if a child znode represents the same type(read or write) of lock + * @param child The child znode we want to check. + * @return whether the child znode represents the same type(read or write) of lock + */ + protected boolean isChildOfSameType(String child) { + int idx = child.lastIndexOf(ZKUtil.ZNODE_PATH_SEPARATOR); + String suffix = child.substring(idx + 1); + return suffix.startsWith(this.childZNode); + } + + /** * Update state as to indicate that a lock is held * @param createdZNode The lock znode * @throws IOException If an unrecoverable ZooKeeper error occurs @@ -304,32 +332,108 @@ } /** + * Process metadata stored in a ZNode using a callback + *

+ * @param lockZNode The node holding the metadata + * @return True if metadata was ready and processed, false otherwise. + */ + protected boolean handleLockMetadata(String lockZNode) { + return handleLockMetadata(lockZNode, handler); + } + + /** * Process metadata stored in a ZNode using a callback object passed to * this instance. *

* @param lockZNode The node holding the metadata - * @return True if metadata was ready and processed - * @throws IOException If an unexpected ZooKeeper error occurs - * @throws InterruptedException If interrupted when reading the metadata + * @param handler the metadata handler + * @return True if metadata was ready and processed, false on exception. */ - protected boolean handleLockMetadata(String lockZNode) - throws IOException, InterruptedException { - byte[] metadata = null; + protected boolean handleLockMetadata(String lockZNode, MetadataHandler handler) { + if (handler == null) { + return false; + } try { - metadata = ZKUtil.getData(zkWatcher, lockZNode); + byte[] metadata = ZKUtil.getData(zkWatcher, lockZNode); + handler.handleMetadata(metadata); } catch (KeeperException ex) { - LOG.warn("Cannot getData for znode:" + lockZNode, ex); - } - if (metadata == null) { + LOG.warn("Error processing lock metadata in " + lockZNode); return false; } - if (handler != null) { - handler.handleMetadata(metadata); - } return true; } + @Override + public void reapAllLocks() throws IOException { + reapExpiredLocks(0); + } + /** + * Will delete all lock znodes of this type (either read or write) which are "expired" + * according to timeout. Assumption is that the clock skew between zookeeper and this servers + * is negligible. + * Referred in zk recipe as "Revocable Shared Locks with Freaking Laser Beams". + * (http://zookeeper.apache.org/doc/trunk/recipes.html). + */ + public void reapExpiredLocks(long timeout) throws IOException { + List children; + try { + children = ZKUtil.listChildrenNoWatch(zkWatcher, parentLockNode); + } catch (KeeperException e) { + LOG.error("Unexpected ZooKeeper error when listing children", e); + throw new IOException("Unexpected ZooKeeper exception", e); + } + + KeeperException deferred = null; + Stat stat = new Stat(); + long expireDate = System.currentTimeMillis() - timeout; //we are using cTime in zookeeper + for (String child : children) { + if (isChildOfSameType(child)) { + String znode = ZKUtil.joinZNode(parentLockNode, child); + try { + ZKUtil.getDataNoWatch(zkWatcher, znode, stat); + if (stat.getCtime() < expireDate) { + LOG.info("Reaping lock for znode:" + znode); + ZKUtil.deleteNodeFailSilent(zkWatcher, znode); + } + } catch (KeeperException ex) { + LOG.warn("Error reaping the znode for write lock :" + znode); + deferred = ex; + } + } + } + if (deferred != null) { + throw new IOException("ZK exception while reaping locks:", deferred); + } + } + + /** + * Visits the locks (both held and attempted) with the given MetadataHandler. + * @throws InterruptedException If there is an unrecoverable error + */ + public void visitLocks(MetadataHandler handler) throws IOException { + List children; + try { + children = ZKUtil.listChildrenNoWatch(zkWatcher, parentLockNode); + } catch (KeeperException e) { + LOG.error("Unexpected ZooKeeper error when listing children", e); + throw new IOException("Unexpected ZooKeeper exception", e); + } + if (children.size() > 0) { + for (String child : children) { + if (isChildOfSameType(child)) { + String znode = ZKUtil.joinZNode(parentLockNode, child); + String childWatchesZNode = getLockPath(child, children); + if (childWatchesZNode == null) { + LOG.info("Lock is held by: " + child); + } + handleLockMetadata(znode, handler); + } + } + } + } + + /** * Determine based on a list of children under a ZNode, whether or not a * process which created a specified ZNode has obtained a lock. If a lock is * not obtained, return the path that we should watch awaiting its deletion. @@ -343,5 +447,5 @@ * acquired lock. */ protected abstract String getLockPath(String myZNode, List children) - throws IOException, InterruptedException; + throws IOException; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessWriteLock.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessWriteLock.java (revision 1467393) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessWriteLock.java (working copy) @@ -28,7 +28,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; /** * ZooKeeper based write lock: @@ -47,8 +46,7 @@ * {@inheritDoc} */ @Override - protected String getLockPath(String createdZNode, List children) - throws IOException, InterruptedException { + protected String getLockPath(String createdZNode, List children) throws IOException { TreeSet sortedChildren = new TreeSet(ZNodeComparator.COMPARATOR); sortedChildren.addAll(children); @@ -56,43 +54,8 @@ if (pathToWatch != null) { String nodeHoldingLock = sortedChildren.first(); String znode = ZKUtil.joinZNode(parentLockNode, nodeHoldingLock); - try { - handleLockMetadata(znode); - } catch (IOException e) { - LOG.warn("Error processing lock metadata in " + nodeHoldingLock, e); - } + handleLockMetadata(znode); } return pathToWatch; } - - /** - * Referred in zk recipe as "Revocable Shared Locks with Freaking Laser Beams" - * (http://zookeeper.apache.org/doc/trunk/recipes.html). - */ - public void reapAllLocks() throws IOException { - List children; - try { - children = ZKUtil.listChildrenNoWatch(zkWatcher, parentLockNode); - } catch (KeeperException e) { - LOG.error("Unexpected ZooKeeper error when listing children", e); - throw new IOException("Unexpected ZooKeeper exception", e); - } - - KeeperException deferred = null; - for (String child : children) { - if (isChildWriteLock(child)) { - String znode = ZKUtil.joinZNode(parentLockNode, child); - LOG.info("Reaping write lock for znode:" + znode); - try { - ZKUtil.deleteNodeFailSilent(zkWatcher, znode); - } catch (KeeperException ex) { - LOG.warn("Error reaping the znode for write lock :" + znode); - deferred = ex; - } - } - } - if (deferred != null) { - throw new IOException("ZK exception while reaping locks:", deferred); - } - } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessReadLock.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessReadLock.java (revision 1467393) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessReadLock.java (working copy) @@ -48,8 +48,7 @@ * {@inheritDoc} */ @Override - protected String getLockPath(String createdZNode, List children) - throws IOException, InterruptedException { + protected String getLockPath(String createdZNode, List children) throws IOException { TreeSet writeChildren = new TreeSet(ZNodeComparator.COMPARATOR); for (String child : children) { @@ -67,17 +66,8 @@ String pathToWatch = lowerChildren.last(); String nodeHoldingLock = lowerChildren.first(); String znode = ZKUtil.joinZNode(parentLockNode, nodeHoldingLock); - try { - handleLockMetadata(znode); - } catch (IOException e) { - LOG.warn("Error processing lock metadata in " + nodeHoldingLock, e); - } + handleLockMetadata(znode); + return pathToWatch; } - - @Override - public void reapAllLocks() throws IOException { - throw new UnsupportedOperationException( - "Lock reaping is not supported for ZK based read locks"); - } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessReadWriteLock.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessReadWriteLock.java (revision 1467393) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessReadWriteLock.java (working copy) @@ -28,18 +28,20 @@ public interface InterProcessReadWriteLock { /** - * Obtain a reader lock containing given metadata. + * Obtain a read lock containing given metadata. * @param metadata Serialized lock metadata (this may contain information * such as the process owning the lock or the purpose for - * which the lock was acquired). Must not be null. - * @return An instantiated InterProcessReadWriteLock instance + * which the lock was acquired). + * @return An instantiated InterProcessLock instance */ public InterProcessLock readLock(byte[] metadata); /** - * Obtain a writer lock containing given metadata. - * @param metadata See documentation of metadata parameter in readLock() - * @return An instantiated InterProcessReadWriteLock instance + * Obtain a write lock containing given metadata. + * @param metadata Serialized lock metadata (this may contain information + * such as the process owning the lock or the purpose for + * which the lock was acquired). + * @return An instantiated InterProcessLock instance */ public InterProcessLock writeLock(byte[] metadata); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java (revision 1467393) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java (working copy) @@ -262,8 +262,7 @@ * Create all of the RegionPlan's needed to move from the initial cluster state to the desired * state. * - * @param initialRegionMapping Initial mapping of Region to Server - * @param clusterState The desired mapping of ServerName to Regions + * @param cluster The state of the cluster * @return List of RegionPlan's that represent the moves needed to get to desired final state. */ private List createRegionPlans(Cluster cluster) { @@ -325,7 +324,8 @@ * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move * rather than swap. * - * @param regions list of regions. + * @param cluster The state of the cluster + * @param server index of the server * @param chanceOfNoSwap Chance that this will decide to try a move rather * than a swap. * @return a random {@link HRegionInfo} or null if an asymmetrical move is @@ -346,8 +346,8 @@ * Given a server we will want to switch regions with another server. This * function picks a random server from the list. * - * @param server Current Server. This server will never be the return value. - * @param allServers list of all server from which to pick + * @param serverIndex Current Server. This server will never be the return value. + * @param cluster The state of the cluster * @return random server. Null if no other servers were found. */ private int pickOtherServer(int serverIndex, Cluster cluster) { @@ -366,8 +366,7 @@ * This is the main cost function. It will compute a cost associated with a proposed cluster * state. All different costs will be combined with their multipliers to produce a double cost. * - * @param initialRegionMapping Map of where the regions started. - * @param clusterState Map of ServerName to list of regions. + * @param cluster The state of the cluster * @return a double of a cost associated with the proposed */ protected double computeCost(Cluster cluster) { @@ -410,8 +409,7 @@ * Given the starting state of the regions and a potential ending state * compute cost based upon the number of regions that have moved. * - * @param initialRegionMapping The starting location of regions. - * @param clusterState The potential new cluster state. + * @param cluster The state of the cluster * @return The cost. Between 0 and 1. */ double computeMoveCost(Cluster cluster) { @@ -435,7 +433,7 @@ * Compute the cost of a potential cluster state from skew in number of * regions on a cluster * - * @param clusterState The proposed cluster state + * @param cluster The state of the cluster * @return The cost of region load imbalance. */ double computeSkewLoadCost(Cluster cluster) { @@ -450,7 +448,7 @@ * Compute the cost of a potential cluster configuration based upon how evenly * distributed tables are. * - * @param clusterState Proposed cluster state. + * @param cluster The state of the cluster * @return Cost of imbalance in table. */ double computeTableSkewLoadCost(Cluster cluster) { @@ -469,8 +467,7 @@ * Compute a cost of a potential cluster configuration based upon where * {@link org.apache.hadoop.hbase.regionserver.StoreFile}s are located. * - * @param initialRegionMapping - not used - * @param clusterState The state of the cluster + * @param cluster The state of the cluster * @return A cost between 0 and 1. 0 Means all regions are on the sever with * the most local store files. */ @@ -518,7 +515,7 @@ /** * Compute the cost of the current cluster state due to some RegionLoadCost type * - * @param clusterState the cluster + * @param cluster The state of the cluster * @param costType what type of cost to consider * @return the scaled cost. */ Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1467393) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -721,7 +721,7 @@ //are invalidated this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper, serverName); if (!masterRecovery) { - this.tableLockManager.reapAllTableWriteLocks(); + this.tableLockManager.reapWriteLocks(); } status.setStatus("Initializing ZK system trackers"); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java (revision 1467393) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java (working copy) @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.lock.ZKInterProcessReadWriteLock; @@ -66,12 +67,17 @@ protected static final String TABLE_READ_LOCK_TIMEOUT_MS = "hbase.table.read.lock.timeout.ms"; - protected static final int DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS = + protected static final long DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS = 600 * 1000; //10 min default - protected static final int DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS = + protected static final long DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS = 600 * 1000; //10 min default + public static final String TABLE_LOCK_EXPIRE_TIMEOUT = "hbase.table.lock.expire.ms"; + + public static final long DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS = + 600 * 1000; //10 min default + /** * A distributed lock for a table. */ @@ -109,14 +115,32 @@ public abstract TableLock readLock(byte[] tableName, String purpose); /** - * Force releases all table write locks and lock attempts even if this thread does + * Visits all table locks(read and write), and lock attempts with the given callback + * MetadataHandler. + * @param handler the metadata handler to call + * @throws IOException If there is an unrecoverable error + */ + public abstract void visitAllLocks(MetadataHandler handler) throws IOException; + + /** + * Force releases all table locks(read and write) that have been held longer than + * "hbase.table.lock.expire.ms". Assumption is that the clock skew between zookeeper + * and this servers is negligible. + * The behavior of the lock holders still thinking that they have the lock is undefined. + * @throws IOException If there is an unrecoverable error + */ + public abstract void reapAllExpiredLocks() throws IOException; + + /** + * Force releases table write locks and lock attempts even if this thread does * not own the lock. The behavior of the lock holders still thinking that they * have the lock is undefined. This should be used carefully and only when * we can ensure that all write-lock holders have died. For example if only * the master can hold write locks, then we can reap it's locks when the backup * master starts. + * @throws IOException If there is an unrecoverable error */ - public abstract void reapAllTableWriteLocks() throws IOException; + public abstract void reapWriteLocks() throws IOException; /** * Called after a table has been deleted, and after the table lock is released. @@ -135,11 +159,14 @@ // Initialize table level lock manager for schema changes, if enabled. if (conf.getBoolean(TABLE_LOCK_ENABLE, DEFAULT_TABLE_LOCK_ENABLE)) { - int writeLockTimeoutMs = conf.getInt(TABLE_WRITE_LOCK_TIMEOUT_MS, + long writeLockTimeoutMs = conf.getLong(TABLE_WRITE_LOCK_TIMEOUT_MS, DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS); - int readLockTimeoutMs = conf.getInt(TABLE_READ_LOCK_TIMEOUT_MS, + long readLockTimeoutMs = conf.getLong(TABLE_READ_LOCK_TIMEOUT_MS, DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS); - return new ZKTableLockManager(zkWatcher, serverName, writeLockTimeoutMs, readLockTimeoutMs); + long lockExpireTimeoutMs = conf.getLong(TABLE_LOCK_EXPIRE_TIMEOUT, + DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS); + + return new ZKTableLockManager(zkWatcher, serverName, writeLockTimeoutMs, readLockTimeoutMs, lockExpireTimeoutMs); } return new NullTableLockManager(); @@ -167,13 +194,35 @@ return new NullTableLock(); } @Override - public void reapAllTableWriteLocks() throws IOException { + public void reapAllExpiredLocks() throws IOException { } @Override + public void reapWriteLocks() throws IOException { + } + @Override public void tableDeleted(byte[] tableName) throws IOException { } + @Override + public void visitAllLocks(MetadataHandler handler) throws IOException { + } } + /** Public for hbck */ + public static ZooKeeperProtos.TableLock fromBytes(byte[] bytes) { + int pblen = ProtobufUtil.lengthOfPBMagic(); + if (bytes == null || bytes.length < pblen) { + return null; + } + try { + ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder().mergeFrom( + bytes, pblen, bytes.length - pblen).build(); + return data; + } catch (InvalidProtocolBufferException ex) { + LOG.warn("Exception in deserialization", ex); + } + return null; + } + /** * ZooKeeper based TableLockManager */ @@ -192,9 +241,9 @@ } LOG.debug("Table is locked by: " + String.format("[tableName=%s, lockOwner=%s, threadId=%s, " + - "purpose=%s, isShared=%s]", Bytes.toString(data.getTableName().toByteArray()), + "purpose=%s, isShared=%s, createTime=%s]", Bytes.toString(data.getTableName().toByteArray()), ProtobufUtil.toServerName(data.getLockOwner()), data.getThreadId(), - data.getPurpose(), data.getIsShared())); + data.getPurpose(), data.getIsShared(), data.getCreateTime())); } }; @@ -278,7 +327,8 @@ .setLockOwner(ProtobufUtil.toServerName(serverName)) .setThreadId(Thread.currentThread().getId()) .setPurpose(purpose) - .setIsShared(isShared).build(); + .setIsShared(isShared) + .setCreateTime(EnvironmentEdgeManager.currentTimeMillis()).build(); byte[] lockMetadata = toBytes(data); InterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(zkWatcher, tableLockZNode, @@ -291,25 +341,11 @@ return ProtobufUtil.prependPBMagic(data.toByteArray()); } - private static ZooKeeperProtos.TableLock fromBytes(byte[] bytes) { - int pblen = ProtobufUtil.lengthOfPBMagic(); - if (bytes == null || bytes.length < pblen) { - return null; - } - try { - ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder().mergeFrom( - bytes, pblen, bytes.length - pblen).build(); - return data; - } catch (InvalidProtocolBufferException ex) { - LOG.warn("Exception in deserialization", ex); - } - return null; - } - private final ServerName serverName; private final ZooKeeperWatcher zkWatcher; private final long writeLockTimeoutMs; private final long readLockTimeoutMs; + private final long lockExpireTimeoutMs; /** * Initialize a new manager for table-level locks. @@ -322,11 +358,12 @@ * given table, or -1 for no timeout */ public ZKTableLockManager(ZooKeeperWatcher zkWatcher, - ServerName serverName, long writeLockTimeoutMs, long readLockTimeoutMs) { + ServerName serverName, long writeLockTimeoutMs, long readLockTimeoutMs, long lockExpireTimeoutMs) { this.zkWatcher = zkWatcher; this.serverName = serverName; this.writeLockTimeoutMs = writeLockTimeoutMs; this.readLockTimeoutMs = readLockTimeoutMs; + this.lockExpireTimeoutMs = lockExpireTimeoutMs; } @Override @@ -340,19 +377,33 @@ serverName, readLockTimeoutMs, true, purpose); } + public void visitAllLocks(MetadataHandler handler) throws IOException { + for (String tableName : getTableNames()) { + String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName); + ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock( + zkWatcher, tableLockZNode, null); + lock.readLock(null).visitLocks(handler); + lock.writeLock(null).visitLocks(handler); + } + } + + private List getTableNames() throws IOException { + + List tableNames; + try { + tableNames = ZKUtil.listChildrenNoWatch(zkWatcher, zkWatcher.tableLockZNode); + } catch (KeeperException e) { + LOG.error("Unexpected ZooKeeper error when listing children", e); + throw new IOException("Unexpected ZooKeeper exception", e); + } + return tableNames; + } + @Override - public void reapAllTableWriteLocks() throws IOException { + public void reapWriteLocks() throws IOException { //get the table names try { - List tableNames; - try { - tableNames = ZKUtil.listChildrenNoWatch(zkWatcher, zkWatcher.tableLockZNode); - } catch (KeeperException e) { - LOG.error("Unexpected ZooKeeper error when listing children", e); - throw new IOException("Unexpected ZooKeeper exception", e); - } - - for (String tableName : tableNames) { + for (String tableName : getTableNames()) { String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName); ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock( zkWatcher, tableLockZNode, null); @@ -366,6 +417,24 @@ } @Override + public void reapAllExpiredLocks() throws IOException { + //get the table names + try { + for (String tableName : getTableNames()) { + String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName); + ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock( + zkWatcher, tableLockZNode, null); + lock.readLock(null).reapExpiredLocks(lockExpireTimeoutMs); + lock.writeLock(null).reapExpiredLocks(lockExpireTimeoutMs); + } + } catch (IOException ex) { + throw ex; + } catch (Exception ex) { + throw new IOException(ex); + } + } + + @Override public void tableDeleted(byte[] tableName) throws IOException { //table write lock from DeleteHandler is already released, just delete the parent znode String tableNameStr = Bytes.toString(tableName); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessLock.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessLock.java (revision 1467393) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessLock.java (working copy) @@ -63,11 +63,23 @@ /** * If supported, attempts to reap all the locks of this type by forcefully - * deleting the locks. Lock reaping is different than coordinated lock revocation + * deleting the locks (both held and attempted) that have expired according + * to the given timeout. Lock reaping is different than coordinated lock revocation * in that, there is no coordination, and the behavior is undefined if the * lock holder is still alive. * @throws IOException If there is an unrecoverable error reaping the locks */ + public void reapExpiredLocks(long expireTimeoutMs) throws IOException; + + /** + * If supported, attempts to reap all the locks of this type by forcefully + * deleting the locks (both held and attempted). Lock reaping is different + * than coordinated lock revocation in that, there is no coordination, and + * the behavior is undefined if the lock holder is still alive. + * Calling this should have the same affect as calling {@link #reapExpiredLocks(long)} + * with timeout=0. + * @throws IOException If there is an unrecoverable error reaping the locks + */ public void reapAllLocks() throws IOException; /** @@ -83,4 +95,11 @@ */ public void handleMetadata(byte[] metadata); } + + /** + * Visits the locks (both held and attempted) of this type with the given + * {@link MetadataHandler}. + * @throws InterruptedException If there is an unrecoverable error + */ + public void visitLocks(MetadataHandler handler) throws IOException; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java (revision 1467393) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java (working copy) @@ -64,9 +64,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.exceptions.MasterNotRunningException; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException; import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.client.AdminProtocol; import org.apache.hadoop.hbase.client.Delete; @@ -82,6 +80,8 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.exceptions.MasterNotRunningException; +import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.master.MasterFileSystem; @@ -95,6 +95,7 @@ import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker; import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler; import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl; +import org.apache.hadoop.hbase.util.hbck.TableLockChecker; import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker; import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -197,6 +198,7 @@ private boolean fixSplitParents = false; // fix lingering split parents private boolean fixReferenceFiles = false; // fix lingering reference store file private boolean fixEmptyMetaCells = false; // fix (remove) empty REGIONINFO_QUALIFIER rows + private boolean fixTableLocks = false; // fix table locks which are expired // limit checking/fixes to listed tables, if empty attempt to check/fix all // .META. are always checked @@ -455,6 +457,8 @@ offlineReferenceFileRepair(); + checkAndFixTableLocks(); + // Print table summary printTableSummary(tablesInfo); return errors.summarize(); @@ -2471,6 +2475,15 @@ return hbi; } + private void checkAndFixTableLocks() throws IOException { + TableLockChecker checker = new TableLockChecker(createZooKeeperWatcher(), errors); + checker.checkTableLocks(); + + if (this.fixTableLocks) { + checker.fixExpiredTableLocks(); + } + } + /** * Check values in regionInfo for .META. * Check if zero or more than one regions with META are found. @@ -2560,7 +2573,7 @@ Pair pair = HRegionInfo.getHRegionInfoAndServerName(result); if (pair == null || pair.getFirst() == null) { emptyRegionInfoQualifiers.add(result); - errors.reportError(ERROR_CODE.EMPTY_META_CELL, + errors.reportError(ERROR_CODE.EMPTY_META_CELL, "Empty REGIONINFO_QUALIFIER found in .META."); return true; } @@ -2897,7 +2910,7 @@ FIRST_REGION_STARTKEY_NOT_EMPTY, LAST_REGION_ENDKEY_NOT_EMPTY, DUPE_STARTKEYS, HOLE_IN_REGION_CHAIN, OVERLAP_IN_REGION_CHAIN, REGION_CYCLE, DEGENERATE_REGION, ORPHAN_HDFS_REGION, LINGERING_SPLIT_PARENT, NO_TABLEINFO_FILE, LINGERING_REFERENCE_HFILE, - WRONG_USAGE, EMPTY_META_CELL + WRONG_USAGE, EMPTY_META_CELL, EXPIRED_TABLE_LOCK } public void clear(); public void report(String message); @@ -3238,6 +3251,14 @@ } /** + * Set table locks fix mode. + * Delete table locks held for a long time + */ + public void setFixTableLocks(boolean shouldFix) { + fixTableLocks = shouldFix; + } + + /** * Check if we should rerun fsck again. This checks if we've tried to * fix something and we should rerun fsck tool again. * Display the full report from fsck. This displays all live and dead @@ -3476,9 +3497,13 @@ out.println(""); out.println(" Metadata Repair shortcuts"); out.println(" -repair Shortcut for -fixAssignments -fixMeta -fixHdfsHoles " + - "-fixHdfsOrphans -fixHdfsOverlaps -fixVersionFile -sidelineBigOverlaps -fixReferenceFiles"); + "-fixHdfsOrphans -fixHdfsOverlaps -fixVersionFile -sidelineBigOverlaps -fixReferenceFiles -fixTableLocks"); out.println(" -repairHoles Shortcut for -fixAssignments -fixMeta -fixHdfsHoles"); + out.println(""); + out.println(" Table lock options"); + out.println(" -fixTableLocks Deletes table locks held for a long time (hbase.table.lock.expire.ms, 10min by default)"); + out.flush(); errors.reportError(ERROR_CODE.WRONG_USAGE, sw.toString()); @@ -3603,6 +3628,7 @@ setFixSplitParents(false); setCheckHdfs(true); setFixReferenceFiles(true); + setFixTableLocks(true); } else if (cmd.equals("-repairHoles")) { // this will make all missing hdfs regions available but may lose data setFixHdfsHoles(true); @@ -3647,6 +3673,8 @@ setSummary(); } else if (cmd.equals("-metaonly")) { setCheckMetaOnly(); + } else if (cmd.equals("-fixTableLocks")) { + setFixTableLocks(true); } else if (cmd.startsWith("-")) { errors.reportError(ERROR_CODE.WRONG_USAGE, "Unrecognized option:" + cmd); return printUsageAndExit(); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/TableLockChecker.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/TableLockChecker.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/TableLockChecker.java (revision 0) @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util.hbck; + +import java.io.IOException; + +import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.HBaseFsck; +import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + +/** + * Utility to check and fix table locks. Need zookeeper connection. + */ +public class TableLockChecker { + + private ZooKeeperWatcher zkWatcher; + private ErrorReporter errorReporter; + long expireTimeout; + + public TableLockChecker(ZooKeeperWatcher zkWatcher, ErrorReporter errorReporter) { + this.zkWatcher = zkWatcher; + this.errorReporter = errorReporter; + expireTimeout = zkWatcher.getConfiguration().getLong( + TableLockManager.TABLE_LOCK_EXPIRE_TIMEOUT, + TableLockManager.DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS); + } + + public void checkTableLocks() throws IOException { + TableLockManager tableLockManager + = TableLockManager.createTableLockManager(zkWatcher.getConfiguration(), zkWatcher, null); + final long expireDate = EnvironmentEdgeManager.currentTimeMillis() - expireTimeout; + + MetadataHandler handler = new MetadataHandler() { + @Override + public void handleMetadata(byte[] ownerMetadata) { + ZooKeeperProtos.TableLock data = TableLockManager.fromBytes(ownerMetadata); + String msg = "Table lock acquire attempt found:"; + if (data != null) { + msg = msg + + String.format("[tableName=%s, lockOwner=%s, threadId=%s, " + + "purpose=%s, isShared=%s, createTime=%s]", Bytes.toString(data.getTableName().toByteArray()), + ProtobufUtil.toServerName(data.getLockOwner()), data.getThreadId(), + data.getPurpose(), data.getIsShared(), data.getCreateTime()); + } + + if (data.hasCreateTime() && data.getCreateTime() < expireDate) { + errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.EXPIRED_TABLE_LOCK, msg); + } else { + errorReporter.print(msg); + } + } + }; + + tableLockManager.visitAllLocks(handler); + } + + public void fixExpiredTableLocks() throws IOException { + TableLockManager tableLockManager + = TableLockManager.createTableLockManager(zkWatcher.getConfiguration(), zkWatcher, null); + + tableLockManager.reapAllExpiredLocks(); + } + +}