diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java index 04b094a..1395704 100644 --- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java @@ -4925,6 +4925,10 @@ public final class ZooKeeperProtos { // optional string purpose = 5; boolean hasPurpose(); String getPurpose(); + + // optional int64 createTime = 6; + boolean hasCreateTime(); + long getCreateTime(); } public static final class TableLock extends com.google.protobuf.GeneratedMessage @@ -5030,12 +5034,23 @@ public final class ZooKeeperProtos { } } + // optional int64 createTime = 6; + public static final int CREATETIME_FIELD_NUMBER = 6; + private long createTime_; + public boolean hasCreateTime() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + public long getCreateTime() { + return createTime_; + } + private void initFields() { tableName_ = com.google.protobuf.ByteString.EMPTY; lockOwner_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); threadId_ = 0L; isShared_ = false; purpose_ = ""; + createTime_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -5070,6 +5085,9 @@ public final class ZooKeeperProtos { if (((bitField0_ & 0x00000010) == 0x00000010)) { output.writeBytes(5, getPurposeBytes()); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + output.writeInt64(6, createTime_); + } getUnknownFields().writeTo(output); } @@ -5099,6 +5117,10 @@ public final class ZooKeeperProtos { size += com.google.protobuf.CodedOutputStream .computeBytesSize(5, getPurposeBytes()); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(6, createTime_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -5147,6 +5169,11 @@ public final class ZooKeeperProtos { result = result && getPurpose() .equals(other.getPurpose()); } + result = result && (hasCreateTime() == other.hasCreateTime()); + if (hasCreateTime()) { + result = result && (getCreateTime() + == other.getCreateTime()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -5176,6 +5203,10 @@ public final class ZooKeeperProtos { hash = (37 * hash) + PURPOSE_FIELD_NUMBER; hash = (53 * hash) + getPurpose().hashCode(); } + if (hasCreateTime()) { + hash = (37 * hash) + CREATETIME_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getCreateTime()); + } hash = (29 * hash) + getUnknownFields().hashCode(); return hash; } @@ -5307,6 +5338,8 @@ public final class ZooKeeperProtos { bitField0_ = (bitField0_ & ~0x00000008); purpose_ = ""; bitField0_ = (bitField0_ & ~0x00000010); + createTime_ = 0L; + bitField0_ = (bitField0_ & ~0x00000020); return this; } @@ -5369,6 +5402,10 @@ public final class ZooKeeperProtos { to_bitField0_ |= 0x00000010; } result.purpose_ = purpose_; + if (((from_bitField0_ & 0x00000020) == 0x00000020)) { + to_bitField0_ |= 0x00000020; + } + result.createTime_ = createTime_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -5400,6 +5437,9 @@ public final class ZooKeeperProtos { if (other.hasPurpose()) { setPurpose(other.getPurpose()); } + if (other.hasCreateTime()) { + setCreateTime(other.getCreateTime()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -5466,6 +5506,11 @@ public final class ZooKeeperProtos { purpose_ = input.readBytes(); break; } + case 48: { + bitField0_ |= 0x00000020; + createTime_ = input.readInt64(); + break; + } } } } @@ -5664,6 +5709,27 @@ public final class ZooKeeperProtos { onChanged(); } + // optional int64 createTime = 6; + private long createTime_ ; + public boolean hasCreateTime() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + public long getCreateTime() { + return createTime_; + } + public Builder setCreateTime(long value) { + bitField0_ |= 0x00000020; + createTime_ = value; + onChanged(); + return this; + } + public Builder clearCreateTime() { + bitField0_ = (bitField0_ & ~0x00000020); + createTime_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:TableLock) } @@ -5758,11 +5824,12 @@ public final class ZooKeeperProtos { "tate.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" + "BLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n\010pos" + "ition\030\001 \002(\003\"$\n\017ReplicationLock\022\021\n\tlockOw" + - "ner\030\001 \002(\t\"s\n\tTableLock\022\021\n\ttableName\030\001 \001(", - "\014\022\036\n\tlockOwner\030\002 \001(\0132\013.ServerName\022\020\n\010thr" + - "eadId\030\003 \001(\003\022\020\n\010isShared\030\004 \001(\010\022\017\n\007purpose" + - "\030\005 \001(\tBE\n*org.apache.hadoop.hbase.protob" + - "uf.generatedB\017ZooKeeperProtosH\001\210\001\001\240\001\001" + "ner\030\001 \002(\t\"\207\001\n\tTableLock\022\021\n\ttableName\030\001 \001", + "(\014\022\036\n\tlockOwner\030\002 \001(\0132\013.ServerName\022\020\n\010th" + + "readId\030\003 \001(\003\022\020\n\010isShared\030\004 \001(\010\022\017\n\007purpos" + + "e\030\005 \001(\t\022\022\n\ncreateTime\030\006 \001(\003BE\n*org.apach" + + "e.hadoop.hbase.protobuf.generatedB\017ZooKe" + + "eperProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -5854,7 +5921,7 @@ public final class ZooKeeperProtos { internal_static_TableLock_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_TableLock_descriptor, - new java.lang.String[] { "TableName", "LockOwner", "ThreadId", "IsShared", "Purpose", }, + new java.lang.String[] { "TableName", "LockOwner", "ThreadId", "IsShared", "Purpose", "CreateTime", }, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.class, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.Builder.class); return null; diff --git hbase-protocol/src/main/protobuf/ZooKeeper.proto hbase-protocol/src/main/protobuf/ZooKeeper.proto index f71a35b..0e099d1 100644 --- hbase-protocol/src/main/protobuf/ZooKeeper.proto +++ hbase-protocol/src/main/protobuf/ZooKeeper.proto @@ -143,4 +143,5 @@ message TableLock { optional int64 threadId = 3; optional bool isShared = 4; optional string purpose = 5; + optional int64 createTime = 6; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessLock.java hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessLock.java index 0e2301c..d310887 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessLock.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessLock.java @@ -68,7 +68,7 @@ public interface InterProcessLock { * lock holder is still alive. * @throws IOException If there is an unrecoverable error reaping the locks */ - public void reapAllLocks() throws IOException; + public void reapAllWriteLocks() throws IOException; /** * An interface for objects that process lock metadata. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java index 75b2312..9a04e62 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.exceptions.LockTimeoutException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.lock.ZKInterProcessReadWriteLock; @@ -66,12 +67,17 @@ public abstract class TableLockManager { protected static final String TABLE_READ_LOCK_TIMEOUT_MS = "hbase.table.read.lock.timeout.ms"; - protected static final int DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS = + protected static final long DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS = 600 * 1000; //10 min default - protected static final int DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS = + protected static final long DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS = 600 * 1000; //10 min default + public static final String TABLE_LOCK_EXPIRE_TIMEOUT = "hbase.table.lock.expire.ms"; + + public static final long DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS = + 600 * 1000; //10 min default + /** * A distributed lock for a table. */ @@ -109,12 +115,28 @@ public abstract class TableLockManager { public abstract TableLock readLock(byte[] tableName, String purpose); /** + * Visits all table locks, and lock attempts with the given callback + * MetadataHandler. + * @param handler the metadata handler to call + * @throws IOException If there is an unrecoverable error + */ + public abstract void visitLocks(MetadataHandler handler) throws IOException; + + /** + * Force releases table locks that have been held longer than "hbase.table.lock.expire.ms". + * The behavior of the lock holders still thinking that they have the lock is undefined. + * @throws IOException If there is an unrecoverable error + */ + public abstract void reapExpiredLocks() throws IOException; + + /** * Force releases all table write locks and lock attempts even if this thread does * not own the lock. The behavior of the lock holders still thinking that they * have the lock is undefined. This should be used carefully and only when * we can ensure that all write-lock holders have died. For example if only * the master can hold write locks, then we can reap it's locks when the backup * master starts. + * @throws IOException If there is an unrecoverable error */ public abstract void reapAllTableWriteLocks() throws IOException; @@ -135,11 +157,14 @@ public abstract class TableLockManager { // Initialize table level lock manager for schema changes, if enabled. if (conf.getBoolean(TABLE_LOCK_ENABLE, DEFAULT_TABLE_LOCK_ENABLE)) { - int writeLockTimeoutMs = conf.getInt(TABLE_WRITE_LOCK_TIMEOUT_MS, + long writeLockTimeoutMs = conf.getLong(TABLE_WRITE_LOCK_TIMEOUT_MS, DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS); - int readLockTimeoutMs = conf.getInt(TABLE_READ_LOCK_TIMEOUT_MS, + long readLockTimeoutMs = conf.getLong(TABLE_READ_LOCK_TIMEOUT_MS, DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS); - return new ZKTableLockManager(zkWatcher, serverName, writeLockTimeoutMs, readLockTimeoutMs); + long lockExpireTimeoutMs = conf.getLong(TABLE_LOCK_EXPIRE_TIMEOUT, + DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS); + + return new ZKTableLockManager(zkWatcher, serverName, writeLockTimeoutMs, readLockTimeoutMs, lockExpireTimeoutMs); } return new NullTableLockManager(); @@ -167,11 +192,33 @@ public abstract class TableLockManager { return new NullTableLock(); } @Override + public void reapExpiredLocks() throws IOException { + } + @Override public void reapAllTableWriteLocks() throws IOException { } @Override public void tableDeleted(byte[] tableName) throws IOException { } + @Override + public void visitLocks(MetadataHandler handler) throws IOException { + } + } + + /** Public for hbck */ + public static ZooKeeperProtos.TableLock fromBytes(byte[] bytes) { + int pblen = ProtobufUtil.lengthOfPBMagic(); + if (bytes == null || bytes.length < pblen) { + return null; + } + try { + ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder().mergeFrom( + bytes, pblen, bytes.length - pblen).build(); + return data; + } catch (InvalidProtocolBufferException ex) { + LOG.warn("Exception in deserialization", ex); + } + return null; } /** @@ -192,9 +239,9 @@ public abstract class TableLockManager { } LOG.debug("Table is locked by: " + String.format("[tableName=%s, lockOwner=%s, threadId=%s, " + - "purpose=%s, isShared=%s]", Bytes.toString(data.getTableName().toByteArray()), + "purpose=%s, isShared=%s, createTime=%s]", Bytes.toString(data.getTableName().toByteArray()), ProtobufUtil.toServerName(data.getLockOwner()), data.getThreadId(), - data.getPurpose(), data.getIsShared())); + data.getPurpose(), data.getIsShared(), data.getCreateTime())); } }; @@ -278,7 +325,8 @@ public abstract class TableLockManager { .setLockOwner(ProtobufUtil.toServerName(serverName)) .setThreadId(Thread.currentThread().getId()) .setPurpose(purpose) - .setIsShared(isShared).build(); + .setIsShared(isShared) + .setCreateTime(EnvironmentEdgeManager.currentTimeMillis()).build(); byte[] lockMetadata = toBytes(data); InterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(zkWatcher, tableLockZNode, @@ -291,25 +339,11 @@ public abstract class TableLockManager { return ProtobufUtil.prependPBMagic(data.toByteArray()); } - private static ZooKeeperProtos.TableLock fromBytes(byte[] bytes) { - int pblen = ProtobufUtil.lengthOfPBMagic(); - if (bytes == null || bytes.length < pblen) { - return null; - } - try { - ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder().mergeFrom( - bytes, pblen, bytes.length - pblen).build(); - return data; - } catch (InvalidProtocolBufferException ex) { - LOG.warn("Exception in deserialization", ex); - } - return null; - } - private final ServerName serverName; private final ZooKeeperWatcher zkWatcher; private final long writeLockTimeoutMs; private final long readLockTimeoutMs; + private final long lockExpireTimeoutMs; /** * Initialize a new manager for table-level locks. @@ -322,11 +356,12 @@ public abstract class TableLockManager { * given table, or -1 for no timeout */ public ZKTableLockManager(ZooKeeperWatcher zkWatcher, - ServerName serverName, long writeLockTimeoutMs, long readLockTimeoutMs) { + ServerName serverName, long writeLockTimeoutMs, long readLockTimeoutMs, long lockExpireTimeoutMs) { this.zkWatcher = zkWatcher; this.serverName = serverName; this.writeLockTimeoutMs = writeLockTimeoutMs; this.readLockTimeoutMs = readLockTimeoutMs; + this.lockExpireTimeoutMs = lockExpireTimeoutMs; } @Override @@ -340,28 +375,58 @@ public abstract class TableLockManager { serverName, readLockTimeoutMs, true, purpose); } + public void visitLocks(MetadataHandler handler) throws IOException { + for (String tableName : getTableNames()) { + String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName); + ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock( + zkWatcher, tableLockZNode, null); + lock.writeLock(null).visitLocks(handler); + } + } + + private List getTableNames() throws IOException { + + List tableNames; + try { + tableNames = ZKUtil.listChildrenNoWatch(zkWatcher, zkWatcher.tableLockZNode); + } catch (KeeperException e) { + LOG.error("Unexpected ZooKeeper error when listing children", e); + throw new IOException("Unexpected ZooKeeper exception", e); + } + return tableNames; + } + @Override public void reapAllTableWriteLocks() throws IOException { //get the table names try { - List tableNames; - try { - tableNames = ZKUtil.listChildrenNoWatch(zkWatcher, zkWatcher.tableLockZNode); - } catch (KeeperException e) { - LOG.error("Unexpected ZooKeeper error when listing children", e); - throw new IOException("Unexpected ZooKeeper exception", e); + for (String tableName : getTableNames()) { + String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName); + ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock( + zkWatcher, tableLockZNode, null); + lock.writeLock(null).reapAllWriteLocks(); } + } catch (IOException ex) { + throw ex; + } catch (Exception ex) { + LOG.warn("Caught exception while reaping table write locks", ex); + } + } - for (String tableName : tableNames) { + @Override + public void reapExpiredLocks() throws IOException { + //get the table names + try { + for (String tableName : getTableNames()) { String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName); ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock( zkWatcher, tableLockZNode, null); - lock.writeLock(null).reapAllLocks(); + lock.writeLock(null).reapExpiredLocks(lockExpireTimeoutMs); } } catch (IOException ex) { throw ex; } catch (Exception ex) { - LOG.warn("Caught exception while reaping table write locks", ex); + throw new IOException(ex); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index 886ae3b..e1768cb 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -64,9 +64,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.exceptions.MasterNotRunningException; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException; import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.client.AdminProtocol; import org.apache.hadoop.hbase.client.Delete; @@ -82,6 +80,8 @@ import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.exceptions.MasterNotRunningException; +import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.master.MasterFileSystem; @@ -95,6 +95,7 @@ import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE; import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker; import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler; import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl; +import org.apache.hadoop.hbase.util.hbck.TableLockChecker; import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker; import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -197,6 +198,7 @@ public class HBaseFsck extends Configured implements Tool { private boolean fixSplitParents = false; // fix lingering split parents private boolean fixReferenceFiles = false; // fix lingering reference store file private boolean fixEmptyMetaCells = false; // fix (remove) empty REGIONINFO_QUALIFIER rows + private boolean fixTableLocks = false; // fix table locks which are expired // limit checking/fixes to listed tables, if empty attempt to check/fix all // .META. are always checked @@ -455,6 +457,8 @@ public class HBaseFsck extends Configured implements Tool { offlineReferenceFileRepair(); + checkAndFixTableLocks(); + // Print table summary printTableSummary(tablesInfo); return errors.summarize(); @@ -2471,6 +2475,15 @@ public class HBaseFsck extends Configured implements Tool { return hbi; } + private void checkAndFixTableLocks() throws IOException { + TableLockChecker checker = new TableLockChecker(createZooKeeperWatcher(), errors); + checker.checkTableLocks(); + + if (this.fixTableLocks) { + checker.fixExpiredTableLocks(); + } + } + /** * Check values in regionInfo for .META. * Check if zero or more than one regions with META are found. @@ -2560,7 +2573,7 @@ public class HBaseFsck extends Configured implements Tool { Pair pair = HRegionInfo.getHRegionInfoAndServerName(result); if (pair == null || pair.getFirst() == null) { emptyRegionInfoQualifiers.add(result); - errors.reportError(ERROR_CODE.EMPTY_META_CELL, + errors.reportError(ERROR_CODE.EMPTY_META_CELL, "Empty REGIONINFO_QUALIFIER found in .META."); return true; } @@ -2897,7 +2910,7 @@ public class HBaseFsck extends Configured implements Tool { FIRST_REGION_STARTKEY_NOT_EMPTY, LAST_REGION_ENDKEY_NOT_EMPTY, DUPE_STARTKEYS, HOLE_IN_REGION_CHAIN, OVERLAP_IN_REGION_CHAIN, REGION_CYCLE, DEGENERATE_REGION, ORPHAN_HDFS_REGION, LINGERING_SPLIT_PARENT, NO_TABLEINFO_FILE, LINGERING_REFERENCE_HFILE, - WRONG_USAGE, EMPTY_META_CELL + WRONG_USAGE, EMPTY_META_CELL, EXPIRED_TABLE_LOCK } public void clear(); public void report(String message); @@ -3238,6 +3251,14 @@ public class HBaseFsck extends Configured implements Tool { } /** + * Set table locks fix mode. + * Delete table locks held for a long time + */ + public void setFixTableLocks(boolean shouldFix) { + fixTableLocks = shouldFix; + } + + /** * Check if we should rerun fsck again. This checks if we've tried to * fix something and we should rerun fsck tool again. * Display the full report from fsck. This displays all live and dead @@ -3476,9 +3497,13 @@ public class HBaseFsck extends Configured implements Tool { out.println(""); out.println(" Metadata Repair shortcuts"); out.println(" -repair Shortcut for -fixAssignments -fixMeta -fixHdfsHoles " + - "-fixHdfsOrphans -fixHdfsOverlaps -fixVersionFile -sidelineBigOverlaps -fixReferenceFiles"); + "-fixHdfsOrphans -fixHdfsOverlaps -fixVersionFile -sidelineBigOverlaps -fixReferenceFiles -fixTableLocks"); out.println(" -repairHoles Shortcut for -fixAssignments -fixMeta -fixHdfsHoles"); + out.println(""); + out.println(" Table lock options"); + out.println(" -fixTableLocks Deletes table locks held for a long time (hbase.table.lock.expire.ms, 10min by default)"); + out.flush(); errors.reportError(ERROR_CODE.WRONG_USAGE, sw.toString()); @@ -3603,6 +3628,7 @@ public class HBaseFsck extends Configured implements Tool { setFixSplitParents(false); setCheckHdfs(true); setFixReferenceFiles(true); + setFixTableLocks(true); } else if (cmd.equals("-repairHoles")) { // this will make all missing hdfs regions available but may lose data setFixHdfsHoles(true); @@ -3647,6 +3673,8 @@ public class HBaseFsck extends Configured implements Tool { setSummary(); } else if (cmd.equals("-metaonly")) { setCheckMetaOnly(); + } else if (cmd.equals("-fixTableLocks")) { + setFixTableLocks(true); } else if (cmd.startsWith("-")) { errors.reportError(ERROR_CODE.WRONG_USAGE, "Unrecognized option:" + cmd); return printUsageAndExit(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessLockBase.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessLockBase.java index 5981c77..944a91c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessLockBase.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessLockBase.java @@ -113,18 +113,22 @@ public abstract class ZKInterProcessLockBase implements InterProcessLock { /** Parses sequenceId from the znode name. Zookeeper documentation * states: The sequence number is always fixed length of 10 digits, 0 padded */ - public static int getChildSequenceId(String childZNode) { + public static long getChildSequenceId(String childZNode) { Preconditions.checkNotNull(childZNode); assert childZNode.length() >= 10; String sequenceIdStr = childZNode.substring(childZNode.length() - 10); - return Integer.parseInt(sequenceIdStr); + return Long.parseLong(sequenceIdStr); } @Override public int compare(String zNode1, String zNode2) { - int seq1 = getChildSequenceId(zNode1); - int seq2 = getChildSequenceId(zNode2); - return seq1 - seq2; + long seq1 = getChildSequenceId(zNode1); + long seq2 = getChildSequenceId(zNode2); + if (seq1 == seq2) { + return 0; + } else { + return seq1 < seq2 ? -1 : 1; + } } } @@ -304,28 +308,40 @@ public abstract class ZKInterProcessLockBase implements InterProcessLock { } /** + * Process metadata stored in a ZNode using a callback + *

+ * @param lockZNode The node holding the metadata + * @return True if metadata was ready and processed + * @throws IOException If an unexpected ZooKeeper error occurs + * @throws InterruptedException If interrupted when reading the metadata + */ + protected boolean handleLockMetadata(String lockZNode) + throws IOException { + return handleLockMetadata(lockZNode, handler); + } + + /** * Process metadata stored in a ZNode using a callback object passed to * this instance. *

* @param lockZNode The node holding the metadata + * @param handler the metadata handler * @return True if metadata was ready and processed * @throws IOException If an unexpected ZooKeeper error occurs * @throws InterruptedException If interrupted when reading the metadata */ - protected boolean handleLockMetadata(String lockZNode) - throws IOException, InterruptedException { - byte[] metadata = null; + protected boolean handleLockMetadata(String lockZNode, MetadataHandler handler) + throws IOException { + if (handler == null) { + return false; + } try { - metadata = ZKUtil.getData(zkWatcher, lockZNode); + byte[] metadata = ZKUtil.getData(zkWatcher, lockZNode); + handler.handleMetadata(metadata); } catch (KeeperException ex) { LOG.warn("Cannot getData for znode:" + lockZNode, ex); - } - if (metadata == null) { return false; } - if (handler != null) { - handler.handleMetadata(metadata); - } return true; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessReadLock.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessReadLock.java index 87aa152..8dff246 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessReadLock.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessReadLock.java @@ -76,7 +76,7 @@ public class ZKInterProcessReadLock extends ZKInterProcessLockBase { } @Override - public void reapAllLocks() throws IOException { + public void reapAllWriteLocks() throws IOException { throw new UnsupportedOperationException( "Lock reaping is not supported for ZK based read locks"); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessWriteLock.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessWriteLock.java index 0a74f30..1f93272 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessWriteLock.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessWriteLock.java @@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; /** * ZooKeeper based write lock: @@ -48,7 +49,7 @@ public class ZKInterProcessWriteLock extends ZKInterProcessLockBase { */ @Override protected String getLockPath(String createdZNode, List children) - throws IOException, InterruptedException { + throws IOException { TreeSet sortedChildren = new TreeSet(ZNodeComparator.COMPARATOR); sortedChildren.addAll(children); @@ -66,10 +67,74 @@ public class ZKInterProcessWriteLock extends ZKInterProcessLockBase { } /** + * Prints information about the locks + * @throws InterruptedException + */ + public void visitLocks(MetadataHandler handler) throws IOException { + List children; + try { + children = ZKUtil.listChildrenNoWatch(zkWatcher, parentLockNode); + } catch (KeeperException e) { + LOG.error("Unexpected ZooKeeper error when listing children", e); + throw new IOException("Unexpected ZooKeeper exception", e); + } + if (children.size() > 0) { + String currentLockHolder = getLockPath("lock-9999999999", children); + for (String child : children) { + String znode = ZKUtil.joinZNode(parentLockNode, child); + if (child.equals(currentLockHolder)) { + LOG.info("Current lock holder is: " + child); + } + handleLockMetadata(znode, handler); + } + } + } + + /** + * Will delete all lock znodes (read and write) which are "expired" according to + * timeout. Assumption is that the clock skew between zookeeper and this servers + * is negligible. + * Referred in zk recipe as "Revocable Shared Locks with Freaking Laser Beams". + * (http://zookeeper.apache.org/doc/trunk/recipes.html). + */ + //TODO: we can bring this and reapAllWriteLocks() to InterProcessLock interface. + public void reapExpiredLocks(long timeout) throws IOException { + List children; + try { + children = ZKUtil.listChildrenNoWatch(zkWatcher, parentLockNode); + } catch (KeeperException e) { + LOG.error("Unexpected ZooKeeper error when listing children", e); + throw new IOException("Unexpected ZooKeeper exception", e); + } + + KeeperException deferred = null; + Stat stat = new Stat(); + long expireDate = System.currentTimeMillis() - timeout; //we are using cTime in zookeeper + for (String child : children) { + String znode = ZKUtil.joinZNode(parentLockNode, child); + try { + ZKUtil.getDataNoWatch(zkWatcher, znode, stat); + if (stat.getCtime() < expireDate) { + LOG.info("Reaping lock for znode:" + znode); + ZKUtil.deleteNodeFailSilent(zkWatcher, znode); + } + } catch (KeeperException ex) { + LOG.warn("Error reaping the znode for write lock :" + znode); + deferred = ex; + } + + } + if (deferred != null) { + throw new IOException("ZK exception while reaping locks:", deferred); + } + } + + /** + * Will delete all WRITE locks, and lock attempts. * Referred in zk recipe as "Revocable Shared Locks with Freaking Laser Beams" * (http://zookeeper.apache.org/doc/trunk/recipes.html). */ - public void reapAllLocks() throws IOException { + public void reapAllWriteLocks() throws IOException { List children; try { children = ZKUtil.listChildrenNoWatch(zkWatcher, parentLockNode); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java index 17e4852..8f485d9 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java @@ -34,7 +34,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; @@ -68,6 +73,8 @@ import org.apache.hadoop.hbase.io.hfile.TestHFile; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.RegionStates; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.regionserver.HRegionServer; @@ -85,8 +92,8 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Test; import org.junit.Ignore; +import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; @@ -1325,7 +1332,7 @@ public class TestHBaseFsck { // TODO: fixHdfsHoles does not work against splits, since the parent dir lingers on // for some time until children references are deleted. HBCK erroneously sees this as // overlapping regions - HBaseFsck hbck = doFsck(conf, true, true, false, false, false, true, true, true, false, null); + HBaseFsck hbck = doFsck(conf, true, true, false, false, false, true, true, true, false, false, null); assertErrors(hbck, new ERROR_CODE[] {}); //no LINGERING_SPLIT_PARENT reported // assert that the split META entry is still there. @@ -1387,7 +1394,7 @@ public class TestHBaseFsck { ERROR_CODE.NOT_IN_META_OR_DEPLOYED, ERROR_CODE.HOLE_IN_REGION_CHAIN}); //no LINGERING_SPLIT_PARENT // now fix it. The fix should not revert the region split, but add daughters to META - hbck = doFsck(conf, true, true, false, false, false, false, false, false, false, null); + hbck = doFsck(conf, true, true, false, false, false, false, false, false, false, false, null); assertErrors(hbck, new ERROR_CODE[] {ERROR_CODE.NOT_IN_META_OR_DEPLOYED, ERROR_CODE.NOT_IN_META_OR_DEPLOYED, ERROR_CODE.HOLE_IN_REGION_CHAIN}); @@ -1962,6 +1969,67 @@ public class TestHBaseFsck { } } + @Test(timeout=30000) + public void testCheckTableLocks() throws Exception { + IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(0); + EnvironmentEdgeManager.injectEdge(edge); + //check no errors + HBaseFsck hbck = doFsck(conf, false); + assertNoErrors(hbck); + + ServerName mockName = new ServerName("localhost", 60000, 1); + + //obtain one lock + final TableLockManager tableLockManager = TableLockManager.createTableLockManager(conf, TEST_UTIL.getZooKeeperWatcher(), mockName); + TableLock writeLock = tableLockManager.writeLock(Bytes.toBytes("foo"), "testCheckTableLocks"); + writeLock.acquire(); + hbck = doFsck(conf, false); + assertNoErrors(hbck); //should not have expired no problems + + edge.incrementTime(conf.getLong(TableLockManager.TABLE_LOCK_EXPIRE_TIMEOUT, + TableLockManager.DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS)); //let table lock expire + + hbck = doFsck(conf, false); + assertErrors(hbck, new ERROR_CODE[] {ERROR_CODE.EXPIRED_TABLE_LOCK}); + + final CountDownLatch latch = new CountDownLatch(1); + new Thread() { + public void run() { + TableLock writeLock = tableLockManager.writeLock(Bytes.toBytes("foo"), "testCheckTableLocks"); + try { + latch.countDown(); + writeLock.acquire(); + } catch (IOException ex) { + } + }; + }.start(); + + latch.await(); //wait until thread starts + Threads.sleep(300); //wait some more to ensure writeLock.acquire() is called + + hbck = doFsck(conf, false); + assertErrors(hbck, new ERROR_CODE[] {ERROR_CODE.EXPIRED_TABLE_LOCK}); //still one expired, one not-expired + + edge.incrementTime(conf.getLong(TableLockManager.TABLE_LOCK_EXPIRE_TIMEOUT, + TableLockManager.DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS)); //let table lock expire + + hbck = doFsck(conf, false); + assertErrors(hbck, new ERROR_CODE[] {ERROR_CODE.EXPIRED_TABLE_LOCK, ERROR_CODE.EXPIRED_TABLE_LOCK}); //both are expired + + conf.setLong(TableLockManager.TABLE_LOCK_EXPIRE_TIMEOUT, 1); //reaping from ZKInterProcessWriteLock uses znode cTime + //, which is not injectable through EnvironmentEdge + Threads.sleep(10); + hbck = doFsck(conf, true); //now fix both cases + + hbck = doFsck(conf, false); + assertNoErrors(hbck); + + //ensure that locks are deleted + writeLock = tableLockManager.writeLock(Bytes.toBytes("foo"), "should acquire without blocking"); + writeLock.acquire(); //this should not block. + writeLock.release(); //release for clean state + } + @org.junit.Rule public TestName name = new TestName(); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java index 7bc7761..e4e1309 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java @@ -38,13 +38,13 @@ public class HbckTestingUtil { public static HBaseFsck doFsck( Configuration conf, boolean fix, String table) throws Exception { - return doFsck(conf, fix, fix, fix, fix,fix, fix, fix, fix, fix, table); + return doFsck(conf, fix, fix, fix, fix,fix, fix, fix, fix, fix, fix, table); } public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments, boolean fixMeta, boolean fixHdfsHoles, boolean fixHdfsOverlaps, boolean fixHdfsOrphans, boolean fixTableOrphans, boolean fixVersionFile, - boolean fixReferenceFiles, boolean fixEmptyMetaRegionInfo, String table) throws Exception { + boolean fixReferenceFiles, boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, String table) throws Exception { HBaseFsck fsck = new HBaseFsck(conf, exec); fsck.connect(); fsck.setDisplayFullReport(); // i.e. -details @@ -58,6 +58,7 @@ public class HbckTestingUtil { fsck.setFixVersionFile(fixVersionFile); fsck.setFixReferenceFiles(fixReferenceFiles); fsck.setFixEmptyMetaCells(fixEmptyMetaRegionInfo); + fsck.setFixTableLocks(fixTableLocks); if (table != null) { fsck.includeTable(table); }