diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java index 93999c6..4b090ea 100644 --- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java @@ -4902,6 +4902,807 @@ public final class ZooKeeperProtos { // @@protoc_insertion_point(class_scope:ReplicationLock) } + public interface TableLockOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required bytes tableName = 1; + boolean hasTableName(); + com.google.protobuf.ByteString getTableName(); + + // required .ServerName lockOwner = 2; + boolean hasLockOwner(); + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getLockOwner(); + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getLockOwnerOrBuilder(); + + // required int64 threadId = 3; + boolean hasThreadId(); + long getThreadId(); + + // required bool isShared = 4; + boolean hasIsShared(); + boolean getIsShared(); + + // optional string purpose = 5; + boolean hasPurpose(); + String getPurpose(); + } + public static final class TableLock extends + com.google.protobuf.GeneratedMessage + implements TableLockOrBuilder { + // Use TableLock.newBuilder() to construct. + private TableLock(Builder builder) { + super(builder); + } + private TableLock(boolean noInit) {} + + private static final TableLock defaultInstance; + public static TableLock getDefaultInstance() { + return defaultInstance; + } + + public TableLock getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_TableLock_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_TableLock_fieldAccessorTable; + } + + private int bitField0_; + // required bytes tableName = 1; + public static final int TABLENAME_FIELD_NUMBER = 1; + private com.google.protobuf.ByteString tableName_; + public boolean hasTableName() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public com.google.protobuf.ByteString getTableName() { + return tableName_; + } + + // required .ServerName lockOwner = 2; + public static final int LOCKOWNER_FIELD_NUMBER = 2; + private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName lockOwner_; + public boolean hasLockOwner() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getLockOwner() { + return lockOwner_; + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getLockOwnerOrBuilder() { + return lockOwner_; + } + + // required int64 threadId = 3; + public static final int THREADID_FIELD_NUMBER = 3; + private long threadId_; + public boolean hasThreadId() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public long getThreadId() { + return threadId_; + } + + // required bool isShared = 4; + public static final int ISSHARED_FIELD_NUMBER = 4; + private boolean isShared_; + public boolean hasIsShared() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public boolean getIsShared() { + return isShared_; + } + + // optional string purpose = 5; + public static final int PURPOSE_FIELD_NUMBER = 5; + private java.lang.Object purpose_; + public boolean hasPurpose() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + public String getPurpose() { + java.lang.Object ref = purpose_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (com.google.protobuf.Internal.isValidUtf8(bs)) { + purpose_ = s; + } + return s; + } + } + private com.google.protobuf.ByteString getPurposeBytes() { + java.lang.Object ref = purpose_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((String) ref); + purpose_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + private void initFields() { + tableName_ = com.google.protobuf.ByteString.EMPTY; + lockOwner_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); + threadId_ = 0L; + isShared_ = false; + purpose_ = ""; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasTableName()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasLockOwner()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasThreadId()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasIsShared()) { + memoizedIsInitialized = 0; + return false; + } + if (!getLockOwner().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, tableName_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeMessage(2, lockOwner_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeInt64(3, threadId_); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeBool(4, isShared_); + } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeBytes(5, getPurposeBytes()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(1, tableName_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(2, lockOwner_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(3, threadId_); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(4, isShared_); + } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(5, getPurposeBytes()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock other = (org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock) obj; + + boolean result = true; + result = result && (hasTableName() == other.hasTableName()); + if (hasTableName()) { + result = result && getTableName() + .equals(other.getTableName()); + } + result = result && (hasLockOwner() == other.hasLockOwner()); + if (hasLockOwner()) { + result = result && getLockOwner() + .equals(other.getLockOwner()); + } + result = result && (hasThreadId() == other.hasThreadId()); + if (hasThreadId()) { + result = result && (getThreadId() + == other.getThreadId()); + } + result = result && (hasIsShared() == other.hasIsShared()); + if (hasIsShared()) { + result = result && (getIsShared() + == other.getIsShared()); + } + result = result && (hasPurpose() == other.hasPurpose()); + if (hasPurpose()) { + result = result && getPurpose() + .equals(other.getPurpose()); + } + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + @java.lang.Override + public int hashCode() { + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasTableName()) { + hash = (37 * hash) + TABLENAME_FIELD_NUMBER; + hash = (53 * hash) + getTableName().hashCode(); + } + if (hasLockOwner()) { + hash = (37 * hash) + LOCKOWNER_FIELD_NUMBER; + hash = (53 * hash) + getLockOwner().hashCode(); + } + if (hasThreadId()) { + hash = (37 * hash) + THREADID_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getThreadId()); + } + if (hasIsShared()) { + hash = (37 * hash) + ISSHARED_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getIsShared()); + } + if (hasPurpose()) { + hash = (37 * hash) + PURPOSE_FIELD_NUMBER; + hash = (53 * hash) + getPurpose().hashCode(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + return hash; + } + + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLockOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_TableLock_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_TableLock_fieldAccessorTable; + } + + // Construct using org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder(BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getLockOwnerFieldBuilder(); + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + tableName_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000001); + if (lockOwnerBuilder_ == null) { + lockOwner_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); + } else { + lockOwnerBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000002); + threadId_ = 0L; + bitField0_ = (bitField0_ & ~0x00000004); + isShared_ = false; + bitField0_ = (bitField0_ & ~0x00000008); + purpose_ = ""; + bitField0_ = (bitField0_ & ~0x00000010); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.getDescriptor(); + } + + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock getDefaultInstanceForType() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock build() { + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock buildPartial() { + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock result = new org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.tableName_ = tableName_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + if (lockOwnerBuilder_ == null) { + result.lockOwner_ = lockOwner_; + } else { + result.lockOwner_ = lockOwnerBuilder_.build(); + } + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.threadId_ = threadId_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + result.isShared_ = isShared_; + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000010; + } + result.purpose_ = purpose_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock) { + return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock other) { + if (other == org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.getDefaultInstance()) return this; + if (other.hasTableName()) { + setTableName(other.getTableName()); + } + if (other.hasLockOwner()) { + mergeLockOwner(other.getLockOwner()); + } + if (other.hasThreadId()) { + setThreadId(other.getThreadId()); + } + if (other.hasIsShared()) { + setIsShared(other.getIsShared()); + } + if (other.hasPurpose()) { + setPurpose(other.getPurpose()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasTableName()) { + + return false; + } + if (!hasLockOwner()) { + + return false; + } + if (!hasThreadId()) { + + return false; + } + if (!hasIsShared()) { + + return false; + } + if (!getLockOwner().isInitialized()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + tableName_ = input.readBytes(); + break; + } + case 18: { + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder subBuilder = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder(); + if (hasLockOwner()) { + subBuilder.mergeFrom(getLockOwner()); + } + input.readMessage(subBuilder, extensionRegistry); + setLockOwner(subBuilder.buildPartial()); + break; + } + case 24: { + bitField0_ |= 0x00000004; + threadId_ = input.readInt64(); + break; + } + case 32: { + bitField0_ |= 0x00000008; + isShared_ = input.readBool(); + break; + } + case 42: { + bitField0_ |= 0x00000010; + purpose_ = input.readBytes(); + break; + } + } + } + } + + private int bitField0_; + + // required bytes tableName = 1; + private com.google.protobuf.ByteString tableName_ = com.google.protobuf.ByteString.EMPTY; + public boolean hasTableName() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public com.google.protobuf.ByteString getTableName() { + return tableName_; + } + public Builder setTableName(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + tableName_ = value; + onChanged(); + return this; + } + public Builder clearTableName() { + bitField0_ = (bitField0_ & ~0x00000001); + tableName_ = getDefaultInstance().getTableName(); + onChanged(); + return this; + } + + // required .ServerName lockOwner = 2; + private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName lockOwner_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder> lockOwnerBuilder_; + public boolean hasLockOwner() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getLockOwner() { + if (lockOwnerBuilder_ == null) { + return lockOwner_; + } else { + return lockOwnerBuilder_.getMessage(); + } + } + public Builder setLockOwner(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) { + if (lockOwnerBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + lockOwner_ = value; + onChanged(); + } else { + lockOwnerBuilder_.setMessage(value); + } + bitField0_ |= 0x00000002; + return this; + } + public Builder setLockOwner( + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder builderForValue) { + if (lockOwnerBuilder_ == null) { + lockOwner_ = builderForValue.build(); + onChanged(); + } else { + lockOwnerBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000002; + return this; + } + public Builder mergeLockOwner(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) { + if (lockOwnerBuilder_ == null) { + if (((bitField0_ & 0x00000002) == 0x00000002) && + lockOwner_ != org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance()) { + lockOwner_ = + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder(lockOwner_).mergeFrom(value).buildPartial(); + } else { + lockOwner_ = value; + } + onChanged(); + } else { + lockOwnerBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000002; + return this; + } + public Builder clearLockOwner() { + if (lockOwnerBuilder_ == null) { + lockOwner_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); + onChanged(); + } else { + lockOwnerBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder getLockOwnerBuilder() { + bitField0_ |= 0x00000002; + onChanged(); + return getLockOwnerFieldBuilder().getBuilder(); + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getLockOwnerOrBuilder() { + if (lockOwnerBuilder_ != null) { + return lockOwnerBuilder_.getMessageOrBuilder(); + } else { + return lockOwner_; + } + } + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder> + getLockOwnerFieldBuilder() { + if (lockOwnerBuilder_ == null) { + lockOwnerBuilder_ = new com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder>( + lockOwner_, + getParentForChildren(), + isClean()); + lockOwner_ = null; + } + return lockOwnerBuilder_; + } + + // required int64 threadId = 3; + private long threadId_ ; + public boolean hasThreadId() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public long getThreadId() { + return threadId_; + } + public Builder setThreadId(long value) { + bitField0_ |= 0x00000004; + threadId_ = value; + onChanged(); + return this; + } + public Builder clearThreadId() { + bitField0_ = (bitField0_ & ~0x00000004); + threadId_ = 0L; + onChanged(); + return this; + } + + // required bool isShared = 4; + private boolean isShared_ ; + public boolean hasIsShared() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public boolean getIsShared() { + return isShared_; + } + public Builder setIsShared(boolean value) { + bitField0_ |= 0x00000008; + isShared_ = value; + onChanged(); + return this; + } + public Builder clearIsShared() { + bitField0_ = (bitField0_ & ~0x00000008); + isShared_ = false; + onChanged(); + return this; + } + + // optional string purpose = 5; + private java.lang.Object purpose_ = ""; + public boolean hasPurpose() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + public String getPurpose() { + java.lang.Object ref = purpose_; + if (!(ref instanceof String)) { + String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); + purpose_ = s; + return s; + } else { + return (String) ref; + } + } + public Builder setPurpose(String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000010; + purpose_ = value; + onChanged(); + return this; + } + public Builder clearPurpose() { + bitField0_ = (bitField0_ & ~0x00000010); + purpose_ = getDefaultInstance().getPurpose(); + onChanged(); + return this; + } + void setPurpose(com.google.protobuf.ByteString value) { + bitField0_ |= 0x00000010; + purpose_ = value; + onChanged(); + } + + // @@protoc_insertion_point(builder_scope:TableLock) + } + + static { + defaultInstance = new TableLock(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:TableLock) + } + private static com.google.protobuf.Descriptors.Descriptor internal_static_RootRegionServer_descriptor; private static @@ -4952,6 +5753,11 @@ public final class ZooKeeperProtos { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_ReplicationLock_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_TableLock_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_TableLock_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -4980,8 +5786,11 @@ public final class ZooKeeperProtos { "tate.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" + "BLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n\010pos" + "ition\030\001 \002(\003\"$\n\017ReplicationLock\022\021\n\tlockOw" + - "ner\030\001 \002(\tBE\n*org.apache.hadoop.hbase.pro", - "tobuf.generatedB\017ZooKeeperProtosH\001\210\001\001\240\001\001" + "ner\030\001 \002(\t\"s\n\tTableLock\022\021\n\ttableName\030\001 \002(", + "\014\022\036\n\tlockOwner\030\002 \002(\0132\013.ServerName\022\020\n\010thr" + + "eadId\030\003 \002(\003\022\020\n\010isShared\030\004 \002(\010\022\017\n\007purpose" + + "\030\005 \001(\tBE\n*org.apache.hadoop.hbase.protob" + + "uf.generatedB\017ZooKeeperProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -5068,6 +5877,14 @@ public final class ZooKeeperProtos { new java.lang.String[] { "LockOwner", }, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationLock.class, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationLock.Builder.class); + internal_static_TableLock_descriptor = + getDescriptor().getMessageTypes().get(10); + internal_static_TableLock_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_TableLock_descriptor, + new java.lang.String[] { "TableName", "LockOwner", "ThreadId", "IsShared", "Purpose", }, + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.class, + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.Builder.class); return null; } }; diff --git hbase-protocol/src/main/protobuf/ZooKeeper.proto hbase-protocol/src/main/protobuf/ZooKeeper.proto index e20e17b..f71a35b 100644 --- hbase-protocol/src/main/protobuf/ZooKeeper.proto +++ hbase-protocol/src/main/protobuf/ZooKeeper.proto @@ -133,3 +133,14 @@ message ReplicationHLogPosition { message ReplicationLock { required string lockOwner = 1; } + +/** + * Metadata associated with a table lock in zookeeper + */ +message TableLock { + optional bytes tableName = 1; + optional ServerName lockOwner = 2; + optional int64 threadId = 3; + optional bool isShared = 4; + optional string purpose = 5; +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessLock.java hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessLock.java new file mode 100644 index 0000000..0e2301c --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessLock.java @@ -0,0 +1,86 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * An interface for an application-specific lock. + */ +@InterfaceAudience.Private +public interface InterProcessLock { + + /** + * Acquire the lock, waiting indefinitely until the lock is released or + * the thread is interrupted. + * @throws IOException If there is an unrecoverable error releasing the lock + * @throws InterruptedException If current thread is interrupted while + * waiting for the lock + */ + public void acquire() throws IOException, InterruptedException; + + /** + * Acquire the lock within a wait time. + * @param timeoutMs The maximum time (in milliseconds) to wait for the lock, + * -1 to wait indefinitely + * @return True if the lock was acquired, false if waiting time elapsed + * before the lock was acquired + * @throws IOException If there is an unrecoverable error talking talking + * (e.g., when talking to a lock service) when acquiring + * the lock + * @throws InterruptedException If the thread is interrupted while waiting to + * acquire the lock + */ + public boolean tryAcquire(long timeoutMs) + throws IOException, InterruptedException; + + /** + * Release the lock. + * @throws IOException If there is an unrecoverable error releasing the lock + * @throws InterruptedException If the thread is interrupted while releasing + * the lock + */ + public void release() throws IOException, InterruptedException; + + /** + * If supported, attempts to reap all the locks of this type by forcefully + * deleting the locks. Lock reaping is different than coordinated lock revocation + * in that, there is no coordination, and the behavior is undefined if the + * lock holder is still alive. + * @throws IOException If there is an unrecoverable error reaping the locks + */ + public void reapAllLocks() throws IOException; + + /** + * An interface for objects that process lock metadata. + */ + public static interface MetadataHandler { + + /** + * Called after lock metadata is successfully read from a distributed + * lock service. This method may contain any procedures for, e.g., + * printing the metadata in a humanly-readable format. + * @param metadata The metadata + */ + public void handleMetadata(byte[] metadata); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessReadWriteLock.java hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessReadWriteLock.java new file mode 100644 index 0000000..cc70f42 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessReadWriteLock.java @@ -0,0 +1,45 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * An interface for a distributed reader-writer lock. + */ +@InterfaceAudience.Private +public interface InterProcessReadWriteLock { + + /** + * Obtain a reader lock containing given metadata. + * @param metadata Serialized lock metadata (this may contain information + * such as the process owning the lock or the purpose for + * which the lock was acquired). Must not be null. + * @return An instantiated InterProcessReadWriteLock instance + */ + public InterProcessLock readLock(byte[] metadata); + + /** + * Obtain a writer lock containing given metadata. + * @param metadata See documentation of metadata parameter in readLock() + * @return An instantiated InterProcessReadWriteLock instance + */ + public InterProcessLock writeLock(byte[] metadata); +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/LockTimeoutException.java hbase-server/src/main/java/org/apache/hadoop/hbase/LockTimeoutException.java new file mode 100644 index 0000000..a3bbbba --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/LockTimeoutException.java @@ -0,0 +1,37 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +public class LockTimeoutException extends IOException { + + private static final long serialVersionUID = -1770764924258999825L; + + /** Default constructor */ + public LockTimeoutException() { + super(); + } + + public LockTimeoutException(String s) { + super(s); + } + +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java index 98535ac..b734a01 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java @@ -33,10 +33,10 @@ import org.cloudera.htrace.Trace; /** * Abstract base class for all HBase event handlers. Subclasses should - * implement the {@link #process()} method. Subclasses should also do all - * necessary checks up in their constructor if possible -- check table exists, - * is disabled, etc. -- so they fail fast rather than later when process is - * running. Do it this way because process be invoked directly but event + * implement the {@link #process()} and {@link #prepare()} methods. Subclasses + * should also do all necessary checks up in their prepare() if possible -- check + * table exists, is disabled, etc. -- so they fail fast rather than later when process + * is running. Do it this way because process be invoked directly but event * handlers are also * run in an executor context -- i.e. asynchronously -- and in this case, * exceptions thrown at process time will not be seen by the invoker, not till @@ -102,7 +102,7 @@ public abstract class EventHandler implements Runnable, Comparable { * originated and then where its destined -- e.g. RS2ZK_ prefix means the * event came from a regionserver destined for zookeeper -- and then what * the even is; e.g. REGION_OPENING. - * + * *

We give the enums indices so we can add types later and keep them * grouped together rather than have to add them always to the end as we * would have to if we used raw enum ordinals. @@ -195,6 +195,19 @@ public abstract class EventHandler implements Runnable, Comparable { getInt("hbase.master.event.waiting.time", 1000); } + /** + * Event handlers should do all the necessary checks in this method (rather than + * in the constructor, or in process()) so that the caller, which is mostly executed + * in the ipc context can fail fast. Process is executed async from the client ipc, + * so this method gives a quick chance to do some basic checks. + * Should be called after constructing the EventHandler, and before process(). + * @return the instance of this class + * @throws Exception when something goes wrong + */ + public EventHandler prepare() throws Exception { + return this; + } + public void run() { Span chunk = Trace.startSpan(Thread.currentThread().getName(), parent, Sampler.ALWAYS); @@ -275,7 +288,7 @@ public abstract class EventHandler implements Runnable, Comparable { public synchronized void setListener(EventHandlerListener listener) { this.listener = listener; } - + @Override public String toString() { return "Event #" + getSeqid() + diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 16ecfc2..70fc871 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -112,6 +112,8 @@ public class AssignmentManager extends ZooKeeperListener { private LoadBalancer balancer; + private final TableLockManager tableLockManager; + final private KeyLocker locker = new KeyLocker(); /** @@ -192,7 +194,8 @@ public class AssignmentManager extends ZooKeeperListener { */ public AssignmentManager(Server server, ServerManager serverManager, CatalogTracker catalogTracker, final LoadBalancer balancer, - final ExecutorService service, MetricsMaster metricsMaster) throws KeeperException, IOException { + final ExecutorService service, MetricsMaster metricsMaster, + final TableLockManager tableLockManager) throws KeeperException, IOException { super(server.getZooKeeper()); this.server = server; this.serverManager = serverManager; @@ -228,6 +231,7 @@ public class AssignmentManager extends ZooKeeperListener { ThreadFactory threadFactory = Threads.newDaemonThreadFactory("hbase-am-zkevent-worker"); zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L, TimeUnit.SECONDS, threadFactory); + this.tableLockManager = tableLockManager; } void startTimeOutMonitor() { @@ -301,7 +305,7 @@ public class AssignmentManager extends ZooKeeperListener { public Pair getReopenStatus(byte[] tableName) throws IOException { List hris = - MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName); + MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName, true); Integer pending = 0; for (HRegionInfo hri : hris) { String name = hri.getEncodedName(); @@ -1258,7 +1262,7 @@ public class AssignmentManager extends ZooKeeperListener { */ public void regionOffline(final HRegionInfo regionInfo) { regionStates.regionOffline(regionInfo); - + removeClosedRegion(regionInfo); // remove the region plan as well just in case. clearRegionPlan(regionInfo); } @@ -2408,8 +2412,8 @@ public class AssignmentManager extends ZooKeeperListener { LOG.info("The table " + tableName + " is in DISABLING state. Hence recovering by moving the table" + " to DISABLED state."); - new DisableTableHandler(this.server, tableName.getBytes(), - catalogTracker, this, true).process(); + new DisableTableHandler(this.server, tableName.getBytes(), catalogTracker, + this, tableLockManager, true).prepare().process(); } } } @@ -2434,7 +2438,7 @@ public class AssignmentManager extends ZooKeeperListener { // enableTable in sync way during master startup, // no need to invoke coprocessor new EnableTableHandler(this.server, tableName.getBytes(), - catalogTracker, this, true).process(); + catalogTracker, this, tableLockManager, true).prepare().process(); } } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 9ccff6f..4d5c9f9 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -97,7 +97,6 @@ import org.apache.hadoop.hbase.master.handler.ModifyTableHandler; import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler; import org.apache.hadoop.hbase.master.handler.TableAddFamilyHandler; import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler; -import org.apache.hadoop.hbase.master.handler.TableEventHandler; import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler; import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer; import org.apache.hadoop.hbase.monitoring.MonitoredTask; @@ -302,6 +301,9 @@ Server { private TableDescriptors tableDescriptors; + // Table level lock manager for schema changes + private TableLockManager tableLockManager; + // Time stamps for when a hmaster was started and when it became active private long masterStartTime; private long masterActiveTime; @@ -548,7 +550,8 @@ Server { this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this); this.loadBalancerTracker.start(); this.assignmentManager = new AssignmentManager(this, serverManager, - this.catalogTracker, this.balancer, this.executorService, this.metricsMaster); + this.catalogTracker, this.balancer, this.executorService, this.metricsMaster, + this.tableLockManager); zooKeeper.registerListenerFirst(assignmentManager); this.regionServerTracker = new RegionServerTracker(zooKeeper, this, @@ -682,6 +685,13 @@ Server { startServiceThreads(); } + //Initialize table lock manager, and ensure that all write locks held previously + //are invalidated + this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper, serverName); + if (!masterRecovery) { + this.tableLockManager.reapAllTableWriteLocks(); + } + // Wait for region servers to report in. this.serverManager.waitForRegionServers(status); // Check zk for region servers that are up but didn't register @@ -1487,7 +1497,7 @@ Server { this.executorService.submit(new CreateTableHandler(this, this.fileSystemManager, hTableDescriptor, conf, - newRegions, catalogTracker, assignmentManager)); + newRegions, this).prepare()); if (cpHost != null) { cpHost.postCreateTable(hTableDescriptor, newRegions); } @@ -1554,7 +1564,7 @@ Server { if (cpHost != null) { cpHost.preDeleteTable(tableName); } - this.executorService.submit(new DeleteTableHandler(tableName, this, this)); + this.executorService.submit(new DeleteTableHandler(tableName, this, this).prepare()); if (cpHost != null) { cpHost.postDeleteTable(tableName); } @@ -1608,7 +1618,9 @@ Server { return; } } - new TableAddFamilyHandler(tableName, column, this, this).process(); + //TODO: we should process this (and some others) in an executor + new TableAddFamilyHandler(tableName, column, this, this) + .prepare().process(); if (cpHost != null) { cpHost.postAddColumn(tableName, column); } @@ -1636,7 +1648,8 @@ Server { return; } } - new TableModifyFamilyHandler(tableName, descriptor, this, this).process(); + new TableModifyFamilyHandler(tableName, descriptor, this, this) + .prepare().process(); if (cpHost != null) { cpHost.postModifyColumn(tableName, descriptor); } @@ -1663,7 +1676,7 @@ Server { return; } } - new TableDeleteFamilyHandler(tableName, columnName, this, this).process(); + new TableDeleteFamilyHandler(tableName, columnName, this, this).prepare().process(); if (cpHost != null) { cpHost.postDeleteColumn(tableName, columnName); } @@ -1687,7 +1700,7 @@ Server { cpHost.preEnableTable(tableName); } this.executorService.submit(new EnableTableHandler(this, tableName, - catalogTracker, assignmentManager, false)); + catalogTracker, assignmentManager, tableLockManager, false).prepare()); if (cpHost != null) { cpHost.postEnableTable(tableName); } @@ -1711,7 +1724,7 @@ Server { cpHost.preDisableTable(tableName); } this.executorService.submit(new DisableTableHandler(this, tableName, - catalogTracker, assignmentManager, false)); + catalogTracker, assignmentManager, tableLockManager, false).prepare()); if (cpHost != null) { cpHost.postDisableTable(tableName); } @@ -1771,8 +1784,7 @@ Server { if (cpHost != null) { cpHost.preModifyTable(tableName, descriptor); } - new ModifyTableHandler(tableName, descriptor, this, this).process(); - + new ModifyTableHandler(tableName, descriptor, this, this).prepare().process(); if (cpHost != null) { cpHost.postModifyTable(tableName, descriptor); } @@ -2034,12 +2046,17 @@ Server { return this.assignmentManager; } + @Override + public TableLockManager getTableLockManager() { + return this.tableLockManager; + } + public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() { return rsFatals; } public void shutdown() throws IOException { - if (spanReceiverHost != null) { + if (spanReceiverHost != null) { spanReceiverHost.closeReceivers(); } if (cpHost != null) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index f9aa860..c336f42 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; -import com.google.protobuf.Service; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; @@ -30,6 +29,8 @@ import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.executor.ExecutorService; +import com.google.protobuf.Service; + /** * Services Master supplies */ @@ -56,6 +57,11 @@ public interface MasterServices extends Server { public ExecutorService getExecutorService(); /** + * @return Master's instance of {@link TableLockManager} + */ + public TableLockManager getTableLockManager(); + + /** * Check table is modifiable; i.e. exists and is offline. * @param tableName Name of table to check. * @throws TableNotDisabledException diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java new file mode 100644 index 0000000..668c379 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java @@ -0,0 +1,387 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.InterProcessLock; +import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler; +import org.apache.hadoop.hbase.InterProcessReadWriteLock; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.LockTimeoutException; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.hbase.zookeeper.lock.ZKInterProcessReadWriteLock; +import org.apache.zookeeper.KeeperException; + +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; + +/** + * A manager for distributed table level locks. + */ +@InterfaceAudience.Private +public abstract class TableLockManager { + + private static final Log LOG = LogFactory.getLog(TableLockManager.class); + + /** Configuration key for enabling table-level locks for schema changes */ + public static final String TABLE_LOCK_ENABLE = + "hbase.table.lock.enable"; + + /** by default we should enable table-level locks for schema changes */ + private static final boolean DEFAULT_TABLE_LOCK_ENABLE = true; + + /** Configuration key for time out for trying to acquire table locks */ + protected static final String TABLE_WRITE_LOCK_TIMEOUT_MS = + "hbase.table.write.lock.timeout.ms"; + + /** Configuration key for time out for trying to acquire table locks */ + protected static final String TABLE_READ_LOCK_TIMEOUT_MS = + "hbase.table.read.lock.timeout.ms"; + + protected static final int DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS = + 600 * 1000; //10 min default + + protected static final int DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS = + 600 * 1000; //10 min default + + /** + * A distributed lock for a table. + */ + @InterfaceAudience.Private + public static interface TableLock { + /** + * Acquire the lock, with the configured lock timeout. + * @throws LockTimeoutException If unable to acquire a lock within a specified + * time period (if any) + * @throws IOException If unrecoverable error occurs + */ + public void acquire() throws IOException; + + /** + * Release the lock already held. + * @throws IOException If there is an unrecoverable error releasing the lock + */ + public void release() throws IOException; + } + + /** + * Returns a TableLock for locking the table for exclusive access + * @param tableName Table to lock + * @param purpose Human readable reason for locking the table + * @return A new TableLock object for acquiring a write lock + */ + public abstract TableLock writeLock(byte[] tableName, String purpose); + + /** + * Returns a TableLock for locking the table for shared access among read-lock holders + * @param tableName Table to lock + * @param purpose Human readable reason for locking the table + * @return A new TableLock object for acquiring a read lock + */ + public abstract TableLock readLock(byte[] tableName, String purpose); + + /** + * Force releases all table write locks and lock attempts even if this thread does + * not own the lock. The behavior of the lock holders still thinking that they + * have the lock is undefined. This should be used carefully and only when + * we can ensure that all write-lock holders have died. For example if only + * the master can hold write locks, then we can reap it's locks when the backup + * master starts. + */ + public abstract void reapAllTableWriteLocks() throws IOException; + + /** + * Called after a table has been deleted, and after the table lock is released. + * TableLockManager should do cleanup for the table state. + * @param tableName name of the table + * @throws IOException If there is an unrecoverable error releasing the lock + */ + public abstract void tableDeleted(byte[] tableName) + throws IOException; + + /** + * Creates and returns a TableLockManager according to the configuration + */ + public static TableLockManager createTableLockManager(Configuration conf, + ZooKeeperWatcher zkWatcher, ServerName serverName) { + // Initialize table level lock manager for schema changes, if enabled. + if (conf.getBoolean(TABLE_LOCK_ENABLE, + DEFAULT_TABLE_LOCK_ENABLE)) { + int writeLockTimeoutMs = conf.getInt(TABLE_WRITE_LOCK_TIMEOUT_MS, + DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS); + int readLockTimeoutMs = conf.getInt(TABLE_READ_LOCK_TIMEOUT_MS, + DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS); + return new ZKTableLockManager(zkWatcher, serverName, writeLockTimeoutMs, readLockTimeoutMs); + } + + return new NullTableLockManager(); + } + + /** + * A null implementation + */ + @InterfaceAudience.Private + static class NullTableLockManager extends TableLockManager { + static class NullTableLock implements TableLock { + @Override + public void acquire() throws IOException { + } + @Override + public void release() throws IOException { + } + } + @Override + public TableLock writeLock(byte[] tableName, String purpose) { + return new NullTableLock(); + } + @Override + public TableLock readLock(byte[] tableName, String purpose) { + return new NullTableLock(); + } + @Override + public void reapAllTableWriteLocks() throws IOException { + } + @Override + public void tableDeleted(byte[] tableName) throws IOException { + } + } + + /** + * ZooKeeper based TableLockManager + */ + @InterfaceAudience.Private + private static class ZKTableLockManager extends TableLockManager { + + private static final MetadataHandler METADATA_HANDLER = new MetadataHandler() { + @Override + public void handleMetadata(byte[] ownerMetadata) { + if (!LOG.isDebugEnabled()) { + return; + } + ZooKeeperProtos.TableLock data = fromBytes(ownerMetadata); + if (data == null) { + return; + } + LOG.debug("Table is locked by: " + + String.format("[tableName=%s, lockOwner=%s, threadId=%s, " + + "purpose=%s, isShared=%s]", Bytes.toString(data.getTableName().toByteArray()), + ProtobufUtil.toServerName(data.getLockOwner()), data.getThreadId(), + data.getPurpose(), data.getIsShared())); + } + }; + + private static class TableLockImpl implements TableLock { + long lockTimeoutMs; + byte[] tableName; + String tableNameStr; + InterProcessLock lock; + boolean isShared; + ZooKeeperWatcher zkWatcher; + ServerName serverName; + String purpose; + + public TableLockImpl(byte[] tableName, ZooKeeperWatcher zkWatcher, + ServerName serverName, long lockTimeoutMs, boolean isShared, String purpose) { + this.tableName = tableName; + tableNameStr = Bytes.toString(tableName); + this.zkWatcher = zkWatcher; + this.serverName = serverName; + this.lockTimeoutMs = lockTimeoutMs; + this.isShared = isShared; + this.purpose = purpose; + } + + @Override + public void acquire() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Attempt to acquire table " + (isShared ? "read" : "write") + + " lock on :" + tableNameStr + " for:" + purpose); + } + + lock = createTableLock(); + try { + if (lockTimeoutMs == -1) { + // Wait indefinitely + lock.acquire(); + } else { + if (!lock.tryAcquire(lockTimeoutMs)) { + throw new LockTimeoutException("Timed out acquiring " + + (isShared ? "read" : "write") + "lock for table:" + tableNameStr + + "for:" + purpose + " after " + lockTimeoutMs + " ms."); + } + } + } catch (InterruptedException e) { + LOG.warn("Interrupted acquiring a lock for " + tableNameStr, e); + Thread.currentThread().interrupt(); + throw new InterruptedIOException("Interrupted acquiring a lock"); + } + LOG.debug("Acquired table " + (isShared ? "read" : "write") + + " lock on :" + tableNameStr + " for:" + purpose); + } + + @Override + public void release() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Attempt to release table " + (isShared ? "read" : "write") + + " lock on :" + tableNameStr); + } + if (lock == null) { + throw new IllegalStateException("Table " + tableNameStr + + " is not locked!"); + } + + try { + lock.release(); + } catch (InterruptedException e) { + LOG.warn("Interrupted while releasing a lock for " + tableNameStr); + Thread.currentThread().interrupt(); + throw new InterruptedIOException(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Released table lock on :" + tableNameStr); + } + } + + private InterProcessLock createTableLock() { + String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableNameStr); + + ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder() + .setTableName(ByteString.copyFrom(tableName)) + .setLockOwner(ProtobufUtil.toServerName(serverName)) + .setThreadId(Thread.currentThread().getId()) + .setPurpose(purpose) + .setIsShared(isShared).build(); + byte[] lockMetadata = toBytes(data); + + InterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(zkWatcher, tableLockZNode, + METADATA_HANDLER); + return isShared ? lock.readLock(lockMetadata) : lock.writeLock(lockMetadata); + } + } + + private static byte[] toBytes(ZooKeeperProtos.TableLock data) { + return ProtobufUtil.prependPBMagic(data.toByteArray()); + } + + private static ZooKeeperProtos.TableLock fromBytes(byte[] bytes) { + int pblen = ProtobufUtil.lengthOfPBMagic(); + if (bytes == null || bytes.length < pblen) { + return null; + } + try { + ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder().mergeFrom( + bytes, pblen, bytes.length - pblen).build(); + return data; + } catch (InvalidProtocolBufferException ex) { + LOG.warn("Exception in deserialization", ex); + } + return null; + } + + private final ServerName serverName; + private final ZooKeeperWatcher zkWatcher; + private final long writeLockTimeoutMs; + private final long readLockTimeoutMs; + + /** + * Initialize a new manager for table-level locks. + * @param zkWatcher + * @param serverName Address of the server responsible for acquiring and + * releasing the table-level locks + * @param writeLockTimeoutMs Timeout (in milliseconds) for acquiring a write lock for a + * given table, or -1 for no timeout + * @param readLockTimeoutMs Timeout (in milliseconds) for acquiring a read lock for a + * given table, or -1 for no timeout + */ + public ZKTableLockManager(ZooKeeperWatcher zkWatcher, + ServerName serverName, long writeLockTimeoutMs, long readLockTimeoutMs) { + this.zkWatcher = zkWatcher; + this.serverName = serverName; + this.writeLockTimeoutMs = writeLockTimeoutMs; + this.readLockTimeoutMs = readLockTimeoutMs; + } + + @Override + public TableLock writeLock(byte[] tableName, String purpose) { + return new TableLockImpl(tableName, zkWatcher, + serverName, writeLockTimeoutMs, false, purpose); + } + + public TableLock readLock(byte[] tableName, String purpose) { + return new TableLockImpl(tableName, zkWatcher, + serverName, readLockTimeoutMs, true, purpose); + } + + @Override + public void reapAllTableWriteLocks() throws IOException { + //get the table names + try { + List tableNames; + try { + tableNames = ZKUtil.listChildrenNoWatch(zkWatcher, zkWatcher.tableLockZNode); + } catch (KeeperException e) { + LOG.error("Unexpected ZooKeeper error when listing children", e); + throw new IOException("Unexpected ZooKeeper exception", e); + } + + for (String tableName : tableNames) { + String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName); + ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock( + zkWatcher, tableLockZNode, null); + lock.writeLock(null).reapAllLocks(); + } + } catch (IOException ex) { + throw ex; + } catch (Exception ex) { + LOG.warn("Caught exception while reaping table write locks", ex); + } + } + + @Override + public void tableDeleted(byte[] tableName) throws IOException { + //table write lock from DeleteHandler is already released, just delete the parent znode + String tableNameStr = Bytes.toString(tableName); + String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableNameStr); + try { + ZKUtil.deleteNode(zkWatcher, tableLockZNode); + } catch (KeeperException ex) { + if (ex.code() == KeeperException.Code.NOTEMPTY) { + //we might get this in rare occasions where a CREATE table or some other table operation + //is waiting to acquire the lock. In this case, parent znode won't be deleted. + LOG.warn("Could not delete the znode for table locks because NOTEMPTY: " + + tableLockZNode); + return; + } + throw new IOException(ex); + } + } + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java index 0130f51..770057a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java @@ -49,6 +49,9 @@ import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.Threads; @@ -65,21 +68,29 @@ public class CreateTableHandler extends EventHandler { private Configuration conf; private final AssignmentManager assignmentManager; private final CatalogTracker catalogTracker; + private final TableLockManager tableLockManager; private final HRegionInfo [] newRegions; + private final TableLock tableLock; public CreateTableHandler(Server server, MasterFileSystem fileSystemManager, HTableDescriptor hTableDescriptor, Configuration conf, HRegionInfo [] newRegions, - CatalogTracker catalogTracker, AssignmentManager assignmentManager) - throws NotAllMetaRegionsOnlineException, TableExistsException, IOException { + MasterServices masterServices) { super(server, EventType.C_M_CREATE_TABLE); this.fileSystemManager = fileSystemManager; this.hTableDescriptor = hTableDescriptor; this.conf = conf; this.newRegions = newRegions; - this.catalogTracker = catalogTracker; - this.assignmentManager = assignmentManager; + this.catalogTracker = masterServices.getCatalogTracker(); + this.assignmentManager = masterServices.getAssignmentManager(); + this.tableLockManager = masterServices.getTableLockManager(); + this.tableLock = this.tableLockManager.writeLock(this.hTableDescriptor.getName() + , EventType.C_M_CREATE_TABLE.toString()); + } + + public CreateTableHandler prepare() + throws NotAllMetaRegionsOnlineException, TableExistsException, IOException { int timeout = conf.getInt("hbase.client.catalog.timeout", 10000); // Need META availability to create a table try { @@ -91,28 +102,40 @@ public class CreateTableHandler extends EventHandler { throw new IOException(e); } - String tableName = this.hTableDescriptor.getNameAsString(); - if (MetaReader.tableExists(catalogTracker, tableName)) { - throw new TableExistsException(tableName); - } - - // If we have multiple client threads trying to create the table at the - // same time, given the async nature of the operation, the table - // could be in a state where .META. table hasn't been updated yet in - // the process() function. - // Use enabling state to tell if there is already a request for the same - // table in progress. This will introduce a new zookeeper call. Given - // createTable isn't a frequent operation, that should be ok. + //acquire the table write lock, blocking. Make sure that it is released. + this.tableLock.acquire(); + boolean success = false; try { - if (!this.assignmentManager.getZKTable().checkAndSetEnablingTable(tableName)) + String tableName = this.hTableDescriptor.getNameAsString(); + if (MetaReader.tableExists(catalogTracker, tableName)) { throw new TableExistsException(tableName); - } catch (KeeperException e) { - throw new IOException("Unable to ensure that the table will be" + - " enabling because of a ZooKeeper issue", e); + } + + // If we have multiple client threads trying to create the table at the + // same time, given the async nature of the operation, the table + // could be in a state where .META. table hasn't been updated yet in + // the process() function. + // Use enabling state to tell if there is already a request for the same + // table in progress. This will introduce a new zookeeper call. Given + // createTable isn't a frequent operation, that should be ok. + //TODO: now that we have table locks, re-evaluate above + try { + if (!this.assignmentManager.getZKTable().checkAndSetEnablingTable(tableName)) { + throw new TableExistsException(tableName); + } + } catch (KeeperException e) { + throw new IOException("Unable to ensure that the table will be" + + " enabling because of a ZooKeeper issue", e); + } + success = true; + } finally { + if (!success) { + releaseTableLock(); + } } + return this; } - @Override public String toString() { String name = "UnknownServerName"; @@ -126,8 +149,9 @@ public class CreateTableHandler extends EventHandler { @Override public void process() { String tableName = this.hTableDescriptor.getNameAsString(); + LOG.info("Attempting to create the table " + tableName); + try { - LOG.info("Attempting to create the table " + tableName); MasterCoprocessorHost cpHost = ((HMaster) this.server).getCoprocessorHost(); if (cpHost != null) { cpHost.preCreateTableHandler(this.hTableDescriptor, this.newRegions); @@ -140,6 +164,18 @@ public class CreateTableHandler extends EventHandler { LOG.error("Error trying to create the table " + tableName, e); } catch (KeeperException e) { LOG.error("Error trying to create the table " + tableName, e); + } finally { + releaseTableLock(); + } + } + + private void releaseTableLock() { + if (this.tableLock != null) { + try { + this.tableLock.release(); + } catch (IOException ex) { + LOG.warn("Could not release the table lock", ex); + } } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java index 868cd5e..9c1239b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java @@ -40,9 +40,12 @@ public class DeleteTableHandler extends TableEventHandler { private static final Log LOG = LogFactory.getLog(DeleteTableHandler.class); public DeleteTableHandler(byte [] tableName, Server server, - final MasterServices masterServices) - throws IOException { + final MasterServices masterServices) { super(EventType.C_M_DELETE_TABLE, tableName, server, masterServices); + } + + @Override + protected void prepareWithTableLock() throws IOException { // The next call fails if no such table. getTableDescriptor(); } @@ -92,6 +95,16 @@ public class DeleteTableHandler extends TableEventHandler { } @Override + protected void releaseTableLock() { + super.releaseTableLock(); + try { + masterServices.getTableLockManager().tableDeleted(tableName); + } catch (IOException ex) { + LOG.warn("Received exception from TableLockManager.tableDeleted:", ex); //not critical + } + } + + @Override public String toString() { String name = "UnknownServerName"; if(server != null && server.getServerName() != null) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java index 3a6b629..f464613 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java @@ -37,6 +37,8 @@ import org.apache.hadoop.hbase.master.BulkAssigner; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.RegionStates; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.util.Bytes; import org.apache.zookeeper.KeeperException; import org.cloudera.htrace.Trace; @@ -50,40 +52,62 @@ public class DisableTableHandler extends EventHandler { private final byte [] tableName; private final String tableNameStr; private final AssignmentManager assignmentManager; + private final TableLockManager tableLockManager; + private final CatalogTracker catalogTracker; + private final boolean skipTableStateCheck; + private TableLock tableLock; public DisableTableHandler(Server server, byte [] tableName, CatalogTracker catalogTracker, AssignmentManager assignmentManager, - boolean skipTableStateCheck) - throws TableNotFoundException, TableNotEnabledException, IOException { + TableLockManager tableLockManager, boolean skipTableStateCheck) { super(server, EventType.C_M_DISABLE_TABLE); this.tableName = tableName; this.tableNameStr = Bytes.toString(this.tableName); this.assignmentManager = assignmentManager; - // Check if table exists - // TODO: do we want to keep this in-memory as well? i guess this is - // part of old master rewrite, schema to zk to check for table - // existence and such - if (!MetaReader.tableExists(catalogTracker, this.tableNameStr)) { - throw new TableNotFoundException(this.tableNameStr); - } + this.catalogTracker = catalogTracker; + this.tableLockManager = tableLockManager; + this.skipTableStateCheck = skipTableStateCheck; + } - // There could be multiple client requests trying to disable or enable - // the table at the same time. Ensure only the first request is honored - // After that, no other requests can be accepted until the table reaches - // DISABLED or ENABLED. - if (!skipTableStateCheck) - { - try { - if (!this.assignmentManager.getZKTable().checkEnabledAndSetDisablingTable - (this.tableNameStr)) { - LOG.info("Table " + tableNameStr + " isn't enabled; skipping disable"); - throw new TableNotEnabledException(this.tableNameStr); + public DisableTableHandler prepare() + throws TableNotFoundException, TableNotEnabledException, IOException { + //acquire the table write lock, blocking + this.tableLock = this.tableLockManager.writeLock(tableName, + EventType.C_M_DISABLE_TABLE.toString()); + this.tableLock.acquire(); + + boolean success = false; + try { + // Check if table exists + if (!MetaReader.tableExists(catalogTracker, this.tableNameStr)) { + throw new TableNotFoundException(this.tableNameStr); + } + + // There could be multiple client requests trying to disable or enable + // the table at the same time. Ensure only the first request is honored + // After that, no other requests can be accepted until the table reaches + // DISABLED or ENABLED. + //TODO: reevaluate this since we have table locks now + if (!skipTableStateCheck) { + try { + if (!this.assignmentManager.getZKTable().checkEnabledAndSetDisablingTable + (this.tableNameStr)) { + LOG.info("Table " + tableNameStr + " isn't enabled; skipping disable"); + throw new TableNotEnabledException(this.tableNameStr); + } + } catch (KeeperException e) { + throw new IOException("Unable to ensure that the table will be" + + " disabling because of a ZooKeeper issue", e); } - } catch (KeeperException e) { - throw new IOException("Unable to ensure that the table will be" + - " disabling because of a ZooKeeper issue", e); + } + success = true; + } finally { + if (!success) { + releaseTableLock(); } } + + return this; } @Override @@ -113,6 +137,18 @@ public class DisableTableHandler extends EventHandler { LOG.error("Error trying to disable table " + this.tableNameStr, e); } catch (KeeperException e) { LOG.error("Error trying to disable table " + this.tableNameStr, e); + } finally { + releaseTableLock(); + } + } + + private void releaseTableLock() { + if (this.tableLock != null) { + try { + this.tableLock.release(); + } catch (IOException ex) { + LOG.warn("Could not release the table lock", ex); + } } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java index f46c870..fd1049d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java @@ -41,6 +41,8 @@ import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.zookeeper.KeeperException; @@ -55,41 +57,60 @@ public class EnableTableHandler extends EventHandler { private final byte [] tableName; private final String tableNameStr; private final AssignmentManager assignmentManager; - private final CatalogTracker ct; + private final TableLockManager tableLockManager; + private final CatalogTracker catalogTracker; private boolean retainAssignment = false; + private TableLock tableLock; public EnableTableHandler(Server server, byte [] tableName, CatalogTracker catalogTracker, AssignmentManager assignmentManager, - boolean skipTableStateCheck) - throws TableNotFoundException, TableNotDisabledException, IOException { + TableLockManager tableLockManager, boolean skipTableStateCheck) { super(server, EventType.C_M_ENABLE_TABLE); this.tableName = tableName; this.tableNameStr = Bytes.toString(tableName); - this.ct = catalogTracker; + this.catalogTracker = catalogTracker; this.assignmentManager = assignmentManager; + this.tableLockManager = tableLockManager; this.retainAssignment = skipTableStateCheck; - // Check if table exists - if (!MetaReader.tableExists(catalogTracker, this.tableNameStr)) { - throw new TableNotFoundException(Bytes.toString(tableName)); - } + } - // There could be multiple client requests trying to disable or enable - // the table at the same time. Ensure only the first request is honored - // After that, no other requests can be accepted until the table reaches - // DISABLED or ENABLED. - if (!skipTableStateCheck) - { - try { - if (!this.assignmentManager.getZKTable().checkDisabledAndSetEnablingTable - (this.tableNameStr)) { - LOG.info("Table " + tableNameStr + " isn't disabled; skipping enable"); - throw new TableNotDisabledException(this.tableNameStr); + public EnableTableHandler prepare() + throws TableNotFoundException, TableNotDisabledException, IOException { + //acquire the table write lock, blocking + this.tableLock = this.tableLockManager.writeLock(tableName, + EventType.C_M_ENABLE_TABLE.toString()); + this.tableLock.acquire(); + + boolean success = false; + try { + // Check if table exists + if (!MetaReader.tableExists(catalogTracker, this.tableNameStr)) { + throw new TableNotFoundException(Bytes.toString(tableName)); + } + + // There could be multiple client requests trying to disable or enable + // the table at the same time. Ensure only the first request is honored + // After that, no other requests can be accepted until the table reaches + // DISABLED or ENABLED. + if (!retainAssignment) { + try { + if (!this.assignmentManager.getZKTable().checkDisabledAndSetEnablingTable + (this.tableNameStr)) { + LOG.info("Table " + tableNameStr + " isn't disabled; skipping enable"); + throw new TableNotDisabledException(this.tableNameStr); + } + } catch (KeeperException e) { + throw new IOException("Unable to ensure that the table will be" + + " enabling because of a ZooKeeper issue", e); } - } catch (KeeperException e) { - throw new IOException("Unable to ensure that the table will be" + - " enabling because of a ZooKeeper issue", e); + } + success = true; + } finally { + if (!success) { + releaseTableLock(); } } + return this; } @Override @@ -121,6 +142,18 @@ public class EnableTableHandler extends EventHandler { LOG.error("Error trying to enable the table " + this.tableNameStr, e); } catch (InterruptedException e) { LOG.error("Error trying to enable the table " + this.tableNameStr, e); + } finally { + releaseTableLock(); + } + } + + private void releaseTableLock() { + if (this.tableLock != null) { + try { + this.tableLock.release(); + } catch (IOException ex) { + LOG.warn("Could not release the table lock", ex); + } } } @@ -134,7 +167,7 @@ public class EnableTableHandler extends EventHandler { // Get the regions of this table. We're done when all listed // tables are onlined. List> tableRegionsAndLocations = MetaReader - .getTableRegionsAndLocations(this.ct, tableName, true); + .getTableRegionsAndLocations(this.catalogTracker, tableName, true); int countOfRegionsInTable = tableRegionsAndLocations.size(); List regions = regionsToAssignWithServerName(tableRegionsAndLocations); int regionsCount = regions.size(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java index a175c21..2f81c0d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java @@ -35,16 +35,20 @@ public class ModifyTableHandler extends TableEventHandler { public ModifyTableHandler(final byte [] tableName, final HTableDescriptor htd, final Server server, - final MasterServices masterServices) - throws IOException { + final MasterServices masterServices) { super(EventType.C_M_MODIFY_TABLE, tableName, server, masterServices); - // Check table exists. - getTableDescriptor(); // This is the new schema we are going to write out as this modification. this.htd = htd; } @Override + protected void prepareWithTableLock() throws IOException { + super.prepareWithTableLock(); + // Check table exists. + getTableDescriptor(); + } + + @Override protected void handleTableOperation(List hris) throws IOException { MasterCoprocessorHost cpHost = ((HMaster) this.server) diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java index 9022bf0..b82a1ea 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java @@ -40,14 +40,19 @@ public class TableAddFamilyHandler extends TableEventHandler { private final HColumnDescriptor familyDesc; public TableAddFamilyHandler(byte[] tableName, HColumnDescriptor familyDesc, - Server server, final MasterServices masterServices) throws IOException { + Server server, final MasterServices masterServices) { super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices); + this.familyDesc = familyDesc; + } + + @Override + protected void prepareWithTableLock() throws IOException { + super.prepareWithTableLock(); HTableDescriptor htd = getTableDescriptor(); if (htd.hasFamily(familyDesc.getName())) { throw new InvalidFamilyOperationException("Family '" + familyDesc.getNameAsString() + "' already exists so cannot be added"); } - this.familyDesc = familyDesc; } @Override diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java index ba46249..28cbcb1 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java @@ -37,11 +37,17 @@ import org.apache.hadoop.hbase.util.Bytes; @InterfaceAudience.Private public class TableDeleteFamilyHandler extends TableEventHandler { - private final byte [] familyName; + private byte [] familyName; public TableDeleteFamilyHandler(byte[] tableName, byte [] familyName, Server server, final MasterServices masterServices) throws IOException { - super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices); + super(EventType.C_M_DELETE_FAMILY, tableName, server, masterServices); + this.familyName = familyName; + } + + @Override + protected void prepareWithTableLock() throws IOException { + super.prepareWithTableLock(); HTableDescriptor htd = getTableDescriptor(); this.familyName = hasColumnFamily(htd, familyName); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java index ad4605c..7704393 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.master.BulkReOpen; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.util.Bytes; import org.apache.zookeeper.KeeperException; @@ -61,37 +62,61 @@ public abstract class TableEventHandler extends EventHandler { protected final MasterServices masterServices; protected final byte [] tableName; protected final String tableNameStr; + protected TableLock tableLock; public TableEventHandler(EventType eventType, byte [] tableName, Server server, - MasterServices masterServices) - throws IOException { + MasterServices masterServices) { super(server, eventType); this.masterServices = masterServices; this.tableName = tableName; + this.tableNameStr = Bytes.toString(this.tableName); + } + + public TableEventHandler prepare() throws IOException { + //acquire the table write lock, blocking + this.tableLock = masterServices.getTableLockManager() + .writeLock(tableName, eventType.toString()); + this.tableLock.acquire(); + boolean success = false; try { - this.masterServices.checkTableModifiable(tableName); - } catch (TableNotDisabledException ex) { - if (isOnlineSchemaChangeAllowed() - && eventType.isOnlineSchemaChangeSupported()) { - LOG.debug("Ignoring table not disabled exception " + - "for supporting online schema changes."); - } else { - throw ex; + try { + this.masterServices.checkTableModifiable(tableName); + } catch (TableNotDisabledException ex) { + if (isOnlineSchemaChangeAllowed() + && eventType.isOnlineSchemaChangeSupported()) { + LOG.debug("Ignoring table not disabled exception " + + "for supporting online schema changes."); + } else { + throw ex; + } + } + prepareWithTableLock(); + success = true; + } finally { + if (!success ) { + releaseTableLock(); } } - this.tableNameStr = Bytes.toString(this.tableName); + return this; } - private boolean isOnlineSchemaChangeAllowed() { - return this.server.getConfiguration().getBoolean( - "hbase.online.schema.update.enable", false); - } + /** Called from prepare() while holding the table lock. Subclasses + * can do extra initialization, and not worry about the releasing + * the table lock. */ + protected void prepareWithTableLock() throws IOException { + } + + private boolean isOnlineSchemaChangeAllowed() { + return this.server.getConfiguration().getBoolean( + "hbase.online.schema.update.enable", false); + } @Override public void process() { try { LOG.info("Handling table operation " + eventType + " on table " + Bytes.toString(tableName)); + List hris = MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName); @@ -110,7 +135,19 @@ public abstract class TableEventHandler extends EventHandler { LOG.error("Error manipulating table " + Bytes.toString(tableName), e); } catch (KeeperException e) { LOG.error("Error manipulating table " + Bytes.toString(tableName), e); - } + } finally { + releaseTableLock(); + } + } + + protected void releaseTableLock() { + if (this.tableLock != null) { + try { + this.tableLock.release(); + } catch (IOException ex) { + LOG.warn("Could not release the table lock", ex); + } + } } public boolean reOpenAllRegions(List regions) throws IOException { @@ -137,7 +174,7 @@ public abstract class TableEventHandler extends EventHandler { reRegions.add(hri); serverToRegions.get(rsLocation).add(hri); } - + LOG.info("Reopening " + reRegions.size() + " regions on " + serverToRegions.size() + " region servers."); this.masterServices.getAssignmentManager().setRegionsToReopen(reRegions); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java index 7bf74c6..1e9b7af 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java @@ -25,12 +25,10 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.util.Bytes; /** * Handles adding a new family to an existing table. @@ -41,11 +39,16 @@ public class TableModifyFamilyHandler extends TableEventHandler { public TableModifyFamilyHandler(byte[] tableName, HColumnDescriptor familyDesc, Server server, - final MasterServices masterServices) throws IOException { + final MasterServices masterServices) { super(EventType.C_M_MODIFY_FAMILY, tableName, server, masterServices); + this.familyDesc = familyDesc; + } + + @Override + protected void prepareWithTableLock() throws IOException { + super.prepareWithTableLock(); HTableDescriptor htd = getTableDescriptor(); hasColumnFamily(htd, familyDesc.getName()); - this.familyDesc = familyDesc; } @Override diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/DeletionListener.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/DeletionListener.java new file mode 100644 index 0000000..f6a788f --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/DeletionListener.java @@ -0,0 +1,101 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import java.util.concurrent.CountDownLatch; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.zookeeper.KeeperException; + +/** + * A ZooKeeper watcher meant to detect deletions of ZNodes. + */ +@InterfaceAudience.Private +public class DeletionListener extends ZooKeeperListener { + + private static final Log LOG = LogFactory.getLog(DeletionListener.class); + + private final String pathToWatch; + private final CountDownLatch deletedLatch; + + private volatile Throwable exception; + + /** + * Create a new instance of the deletion watcher. + * @param zkWatcher ZookeeperWatcher instance + * @param pathToWatch (Fully qualified) ZNode path that we are waiting to + * be deleted. + * @param deletedLatch Count down on this latch when deletion has occured. + */ + public DeletionListener(ZooKeeperWatcher zkWatcher, String pathToWatch, + CountDownLatch deletedLatch) { + super(zkWatcher); + this.pathToWatch = pathToWatch; + this.deletedLatch = deletedLatch; + exception = null; + } + + /** + * Check if an exception has occurred when re-setting the watch. + * @return True if we were unable to re-set a watch on a ZNode due to + * an exception. + */ + public boolean hasException() { + return exception != null; + } + + /** + * Get the last exception which has occurred when re-setting the watch. + * Use hasException() to check whether or not an exception has occurred. + * @return The last exception observed when re-setting the watch. + */ + public Throwable getException() { + return exception; + } + + @Override + public void nodeDataChanged(String path) { + if (!path.equals(pathToWatch)) { + return; + } + try { + if (!(ZKUtil.setWatchIfNodeExists(watcher, pathToWatch))) { + deletedLatch.countDown(); + } + } catch (KeeperException ex) { + exception = ex; + deletedLatch.countDown(); + LOG.error("Error when re-setting the watch on " + pathToWatch, ex); + } + } + + @Override + public void nodeDeleted(String path) { + if (!path.equals(pathToWatch)) { + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Processing delete on " + pathToWatch); + } + deletedLatch.countDown(); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index 11d0e7f..9a4c945 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -49,24 +49,24 @@ import org.apache.zookeeper.proto.SetDataRequest; /** * A zookeeper that can handle 'recoverable' errors. - * To handle recoverable errors, developers need to realize that there are two - * classes of requests: idempotent and non-idempotent requests. Read requests - * and unconditional sets and deletes are examples of idempotent requests, they - * can be reissued with the same results. - * (Although, the delete may throw a NoNodeException on reissue its effect on - * the ZooKeeper state is the same.) Non-idempotent requests need special - * handling, application and library writers need to keep in mind that they may - * need to encode information in the data or name of znodes to detect - * retries. A simple example is a create that uses a sequence flag. - * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection - * loss exception, that process will reissue another - * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a - * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be - * that x-109 was the result of the previous create, so the process actually - * owns both x-109 and x-111. An easy way around this is to use "x-process id-" + * To handle recoverable errors, developers need to realize that there are two + * classes of requests: idempotent and non-idempotent requests. Read requests + * and unconditional sets and deletes are examples of idempotent requests, they + * can be reissued with the same results. + * (Although, the delete may throw a NoNodeException on reissue its effect on + * the ZooKeeper state is the same.) Non-idempotent requests need special + * handling, application and library writers need to keep in mind that they may + * need to encode information in the data or name of znodes to detect + * retries. A simple example is a create that uses a sequence flag. + * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection + * loss exception, that process will reissue another + * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a + * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be + * that x-109 was the result of the previous create, so the process actually + * owns both x-109 and x-111. An easy way around this is to use "x-process id-" * when doing the create. If the process is using an id of 352, before reissuing - * the create it will do a getChildren("/") and see "x-222-1", "x-542-30", - * "x-352-109", x-333-110". The process will know that the original create + * the create it will do a getChildren("/") and see "x-222-1", "x-542-30", + * "x-352-109", x-333-110". The process will know that the original create * succeeded an the znode it created is "x-352-109". * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling" */ @@ -99,15 +99,25 @@ public class RecoverableZooKeeper { private static final int ID_LENGTH_SIZE = Bytes.SIZEOF_INT; public RecoverableZooKeeper(String quorumServers, int sessionTimeout, - Watcher watcher, int maxRetries, int retryIntervalMillis) + Watcher watcher, int maxRetries, int retryIntervalMillis) + throws IOException { + this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis, + null); + } + + public RecoverableZooKeeper(String quorumServers, int sessionTimeout, + Watcher watcher, int maxRetries, int retryIntervalMillis, String identifier) throws IOException { this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher); this.retryCounterFactory = new RetryCounterFactory(maxRetries, retryIntervalMillis); - // the identifier = processID@hostName - this.identifier = ManagementFactory.getRuntimeMXBean().getName(); + if (identifier == null || identifier.length() == 0) { + // the identifier = processID@hostName + identifier = ManagementFactory.getRuntimeMXBean().getName(); + } LOG.info("The identifier of this process is " + identifier); + this.identifier = identifier; this.id = Bytes.toBytes(identifier); this.watcher = watcher; @@ -343,7 +353,7 @@ public class RecoverableZooKeeper { /** * setData is NOT an idempotent operation. Retry may cause BadVersion Exception - * Adding an identifier field into the data to check whether + * Adding an identifier field into the data to check whether * badversion is caused by the result of previous correctly setData * @return Stat instance */ @@ -390,17 +400,17 @@ public class RecoverableZooKeeper { /** *

- * NONSEQUENTIAL create is idempotent operation. + * NONSEQUENTIAL create is idempotent operation. * Retry before throwing exceptions. * But this function will not throw the NodeExist exception back to the * application. *

*

- * But SEQUENTIAL is NOT idempotent operation. It is necessary to add - * identifier to the path to verify, whether the previous one is successful + * But SEQUENTIAL is NOT idempotent operation. It is necessary to add + * identifier to the path to verify, whether the previous one is successful * or not. *

- * + * * @return Path */ public String create(String path, byte[] data, List acl, @@ -417,12 +427,12 @@ public class RecoverableZooKeeper { return createSequential(path, newData, acl, createMode); default: - throw new IllegalArgumentException("Unrecognized CreateMode: " + + throw new IllegalArgumentException("Unrecognized CreateMode: " + createMode); } } - private String createNonSequential(String path, byte[] data, List acl, + private String createNonSequential(String path, byte[] data, List acl, CreateMode createMode) throws KeeperException, InterruptedException { RetryCounter retryCounter = retryCounterFactory.create(); boolean isRetry = false; // False for first attempt, true for all retries. @@ -435,14 +445,14 @@ public class RecoverableZooKeeper { if (isRetry) { // If the connection was lost, there is still a possibility that // we have successfully created the node at our previous attempt, - // so we read the node and compare. + // so we read the node and compare. byte[] currentData = zk.getData(path, false, null); if (currentData != null && - Bytes.compareTo(currentData, data) == 0) { + Bytes.compareTo(currentData, data) == 0) { // We successfully created a non-sequential node return path; } - LOG.error("Node " + path + " already exists with " + + LOG.error("Node " + path + " already exists with " + Bytes.toStringBinary(currentData) + ", could not write " + Bytes.toStringBinary(data)); throw e; @@ -466,8 +476,8 @@ public class RecoverableZooKeeper { isRetry = true; } } - - private String createSequential(String path, byte[] data, + + private String createSequential(String path, byte[] data, List acl, CreateMode createMode) throws KeeperException, InterruptedException { RetryCounter retryCounter = retryCounterFactory.create(); @@ -573,7 +583,7 @@ public class RecoverableZooKeeper { } return null; } - + public byte[] removeMetaData(byte[] data) { if(data == null || data.length == 0) { return data; @@ -642,7 +652,7 @@ public class RecoverableZooKeeper { * @param prefixes the prefixes to include in the result * @return list of every element that starts with one of the prefixes */ - private static List filterByPrefix(List nodes, + private static List filterByPrefix(List nodes, String... prefixes) { List lockChildren = new ArrayList(); for (String child : nodes){ @@ -655,4 +665,8 @@ public class RecoverableZooKeeper { } return lockChildren; } + + public String getIdentifier() { + return identifier; + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 583e3e2..ea70325 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -26,18 +26,15 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Properties; -import java.util.HashMap; import java.util.Map; +import java.util.Properties; import javax.security.auth.login.AppConfigurationEntry; import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.authentication.util.KerberosUtil; - import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -52,17 +49,19 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.CreateAndFailSilent; import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.DeleteNodeFailSilent; import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.SetData; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.authentication.util.KerberosUtil; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Op; import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.Op; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.client.ZooKeeperSaslClient; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; -import org.apache.zookeeper.client.ZooKeeperSaslClient; import org.apache.zookeeper.proto.CreateRequest; import org.apache.zookeeper.proto.DeleteRequest; import org.apache.zookeeper.proto.SetDataRequest; @@ -83,7 +82,7 @@ public class ZKUtil { private static final Log LOG = LogFactory.getLog(ZKUtil.class); // TODO: Replace this with ZooKeeper constant when ZOOKEEPER-277 is resolved. - private static final char ZNODE_PATH_SEPARATOR = '/'; + public static final char ZNODE_PATH_SEPARATOR = '/'; private static int zkDumpConnectionTimeOut; /** @@ -107,18 +106,18 @@ public class ZKUtil { public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher) throws IOException { - return connect(conf, ensemble, watcher, ""); + return connect(conf, ensemble, watcher, null); } public static RecoverableZooKeeper connect(Configuration conf, String ensemble, - Watcher watcher, final String descriptor) + Watcher watcher, final String identifier) throws IOException { if(ensemble == null) { throw new IOException("Unable to determine ZooKeeper ensemble"); } int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); - LOG.debug(descriptor + " opening connection to ZooKeeper with ensemble (" + + LOG.debug(identifier + " opening connection to ZooKeeper with ensemble (" + ensemble + ")"); int retry = conf.getInt("zookeeper.recovery.retry", 3); int retryIntervalMillis = @@ -126,7 +125,7 @@ public class ZKUtil { zkDumpConnectionTimeOut = conf.getInt("zookeeper.dump.connection.timeout", 1000); return new RecoverableZooKeeper(ensemble, timeout, watcher, - retry, retryIntervalMillis); + retry, retryIntervalMillis, identifier); } /** @@ -438,6 +437,30 @@ public class ZKUtil { } /** + * Watch the specified znode, but only if exists. Useful when watching + * for deletions. Uses .getData() (and handles NoNodeException) instead + * of .exists() to accomplish this, as .getData() will only set a watch if + * the znode exists. + * @param zkw zk reference + * @param znode path of node to watch + * @return true if the watch is set, false if node does not exists + * @throws KeeperException if unexpected zookeeper exception + */ + public static boolean setWatchIfNodeExists(ZooKeeperWatcher zkw, String znode) + throws KeeperException { + try { + zkw.getRecoverableZooKeeper().getData(znode, true, null); + return true; + } catch (NoNodeException e) { + return false; + } catch (InterruptedException e) { + LOG.warn(zkw.prefix("Unable to set watcher on znode " + znode), e); + zkw.interruptedException(e); + return false; + } + } + + /** * Check if the specified node exists. Sets no watches. * * @param zkw zk reference @@ -526,15 +549,13 @@ public class ZKUtil { /** * Lists the children of the specified znode without setting any watches. * - * Used to list the currently online regionservers and their addresses. - * * Sets no watches at all, this method is best effort. * * Returns an empty list if the node has no children. Returns null if the * parent node itself does not exist. * * @param zkw zookeeper reference - * @param znode node to get children of as addresses + * @param znode node to get children * @return list of data of children of specified znode, empty if no children, * null if parent does not exist * @throws KeeperException if unexpected zookeeper exception @@ -1023,6 +1044,36 @@ public class ZKUtil { } /** + * Creates the specified znode with the specified data but does not watch it. + * + * Returns the znode of the newly created node + * + * If there is another problem, a KeeperException will be thrown. + * + * @param zkw zk reference + * @param znode path of node + * @param data data of node + * @param createMode specifying whether the node to be created is ephemeral and/or sequential + * @return true name of the newly created znode or null + * @throws KeeperException if unexpected zookeeper exception + */ + public static String createNodeIfNotExistsNoWatch(ZooKeeperWatcher zkw, String znode, + byte[] data, CreateMode createMode) throws KeeperException { + + String createdZNode = null; + try { + createdZNode = zkw.getRecoverableZooKeeper().create(znode, data, + createACL(zkw, znode), createMode); + } catch (KeeperException.NodeExistsException nee) { + return znode; + } catch (InterruptedException e) { + zkw.interruptedException(e); + return null; + } + return createdZNode; + } + + /** * Creates the specified node with the specified data and watches it. * *

Throws an exception if the node already exists. @@ -1308,7 +1359,7 @@ public class ZKUtil { CreateAndFailSilent op = (CreateAndFailSilent) o; return getPath().equals(op.getPath()) && Arrays.equals(data, op.data); } - + @Override public int hashCode() { int ret = 17 + getPath().hashCode() * 31; @@ -1332,7 +1383,7 @@ public class ZKUtil { return super.equals(o); } - + @Override public int hashCode() { return getPath().hashCode(); @@ -1362,7 +1413,7 @@ public class ZKUtil { SetData op = (SetData) o; return getPath().equals(op.getPath()) && Arrays.equals(data, op.data); } - + @Override public int hashCode() { int ret = getPath().hashCode(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index 3b56e74..6952b97 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -103,6 +103,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { public String splitLogZNode; // znode containing the state of the load balancer public String balancerZNode; + // znode containing the lock for the tables + public String tableLockZNode; // Certain ZooKeeper nodes need to be world-readable public static final ArrayList CREATOR_ALL_AND_WORLD_READABLE = @@ -117,23 +119,23 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { /** * Instantiate a ZooKeeper connection and watcher. - * @param descriptor Descriptive string that is added to zookeeper sessionid - * and used as identifier for this instance. + * @param identifier string that is passed to RecoverableZookeeper to be used as + * identifier for this instance. Use null for default. * @throws IOException * @throws ZooKeeperConnectionException */ - public ZooKeeperWatcher(Configuration conf, String descriptor, + public ZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable) throws ZooKeeperConnectionException, IOException { - this(conf, descriptor, abortable, false); + this(conf, identifier, abortable, false); } /** * Instantiate a ZooKeeper connection and watcher. - * @param descriptor Descriptive string that is added to zookeeper sessionid - * and used as identifier for this instance. + * @param identifier string that is passed to RecoverableZookeeper to be used as + * identifier for this instance. Use null for default. * @throws IOException * @throws ZooKeeperConnectionException */ - public ZooKeeperWatcher(Configuration conf, String descriptor, + public ZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable, boolean canCreateBaseZNode) throws IOException, ZooKeeperConnectionException { this.conf = conf; @@ -147,10 +149,10 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { this.quorum = ZKConfig.getZKQuorumServersString(conf); // Identifier will get the sessionid appended later below down when we // handle the syncconnect event. - this.identifier = descriptor; + this.identifier = identifier; this.abortable = abortable; setNodeNames(conf); - this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, descriptor); + this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier); if (canCreateBaseZNode) { createBaseZNodes(); } @@ -166,6 +168,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { ZKUtil.createAndFailSilent(this, tableZNode); ZKUtil.createAndFailSilent(this, splitLogZNode); ZKUtil.createAndFailSilent(this, backupMasterAddressesZNode); + ZKUtil.createAndFailSilent(this, tableLockZNode); } catch (KeeperException e) { throw new ZooKeeperConnectionException( prefix("Unexpected KeeperException creating base node"), e); @@ -215,6 +218,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME)); balancerZNode = ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.balancer", "balancer")); + tableLockZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.tableLock", "table-lock")); } /** @@ -234,6 +239,10 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { listeners.add(0, listener); } + public void unregisterListener(ZooKeeperListener listener) { + listeners.remove(listener); + } + /** * Get the connection to ZooKeeper. * @return connection reference to zookeeper @@ -355,10 +364,10 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { if (this.abortable != null) this.abortable.abort(msg, new KeeperException.SessionExpiredException()); break; - + case ConnectedReadOnly: - break; - + break; + default: throw new IllegalStateException("Received event is not valid."); } @@ -437,7 +446,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { public void abort(String why, Throwable e) { this.abortable.abort(why, e); } - + @Override public boolean isAborted() { return this.abortable.isAborted(); @@ -449,4 +458,5 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { public String getMasterAddressZNode() { return this.masterAddressZNode; } + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessLockBase.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessLockBase.java new file mode 100644 index 0000000..16cfa5a --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessLockBase.java @@ -0,0 +1,346 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper.lock; + +import java.io.IOException; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.InterProcessLock; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.zookeeper.DeletionListener; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.BadVersionException; +import org.apache.zookeeper.data.Stat; + +import com.google.common.base.Preconditions; + +/** + * ZooKeeper based HLock implementation. Based on the Shared Locks recipe. + * (see: + * + * ZooKeeper Recipes and Solutions + * ) + */ +@InterfaceAudience.Private +public abstract class ZKInterProcessLockBase implements InterProcessLock { + + private static final Log LOG = LogFactory.getLog(ZKInterProcessLockBase.class); + + /** ZNode prefix used by processes acquiring reader locks */ + protected static final String READ_LOCK_CHILD_NODE_PREFIX = "read-"; + + /** ZNode prefix used by processes acquiring writer locks */ + protected static final String WRITE_LOCK_CHILD_NODE_PREFIX = "write-"; + + protected final ZooKeeperWatcher zkWatcher; + protected final String parentLockNode; + protected final String fullyQualifiedZNode; + protected final byte[] metadata; + protected final MetadataHandler handler; + + // If we acquire a lock, update this field + protected final AtomicReference acquiredLock = + new AtomicReference(null); + + /** + * Represents information about a lock held by this thread. + */ + protected static class AcquiredLock { + private final String path; + private final int version; + + /** + * Store information about a lock. + * @param path The path to a lock's ZNode + * @param version The current version of the lock's ZNode + */ + public AcquiredLock(String path, int version) { + this.path = path; + this.version = version; + } + + public String getPath() { + return path; + } + + public int getVersion() { + return version; + } + + @Override + public String toString() { + return "AcquiredLockInfo{" + + "path='" + path + '\'' + + ", version=" + version + + '}'; + } + } + + protected static class ZNodeComparator implements Comparator { + + public static final ZNodeComparator COMPARATOR = new ZNodeComparator(); + + private ZNodeComparator() { + } + + /** Parses sequenceId from the znode name. Zookeeper documentation + * states: The sequence number is always fixed length of 10 digits, 0 padded + */ + public static int getChildSequenceId(String childZNode) { + Preconditions.checkNotNull(childZNode); + assert childZNode.length() >= 10; + String sequenceIdStr = childZNode.substring(childZNode.length() - 10); + return Integer.parseInt(sequenceIdStr); + } + + @Override + public int compare(String zNode1, String zNode2) { + int seq1 = getChildSequenceId(zNode1); + int seq2 = getChildSequenceId(zNode2); + return seq1 - seq2; + } + } + + /** + * Called by implementing classes. + * @param zkWatcher + * @param parentLockNode The lock ZNode path + * @param metadata + * @param handler + * @param childNode The prefix for child nodes created under the parent + */ + protected ZKInterProcessLockBase(ZooKeeperWatcher zkWatcher, + String parentLockNode, byte[] metadata, MetadataHandler handler, String childNode) { + this.zkWatcher = zkWatcher; + this.parentLockNode = parentLockNode; + this.fullyQualifiedZNode = ZKUtil.joinZNode(parentLockNode, childNode); + this.metadata = metadata; + this.handler = handler; + try { + ZKUtil.createWithParents(zkWatcher, parentLockNode); + } catch (KeeperException ex) { + LOG.warn("Failed to create znode:" + parentLockNode, ex); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void acquire() throws IOException, InterruptedException { + tryAcquire(-1); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean tryAcquire(long timeoutMs) + throws IOException, InterruptedException { + boolean hasTimeout = timeoutMs != -1; + long waitUntilMs = + hasTimeout ?EnvironmentEdgeManager.currentTimeMillis() + timeoutMs : -1; + String createdZNode = createLockZNode(); + while (true) { + List children; + try { + children = ZKUtil.listChildrenNoWatch(zkWatcher, parentLockNode); + } catch (KeeperException e) { + LOG.error("Unexpected ZooKeeper error when listing children", e); + throw new IOException("Unexpected ZooKeeper exception", e); + } + String pathToWatch; + if ((pathToWatch = getLockPath(createdZNode, children)) == null) { + break; + } + CountDownLatch deletedLatch = new CountDownLatch(1); + String zkPathToWatch = + ZKUtil.joinZNode(parentLockNode, pathToWatch); + DeletionListener deletionListener = + new DeletionListener(zkWatcher, zkPathToWatch, deletedLatch); + zkWatcher.registerListener(deletionListener); + try { + if (ZKUtil.setWatchIfNodeExists(zkWatcher, zkPathToWatch)) { + // Wait for the watcher to fire + if (hasTimeout) { + long remainingMs = waitUntilMs - EnvironmentEdgeManager.currentTimeMillis(); + if (remainingMs < 0 || + !deletedLatch.await(remainingMs, TimeUnit.MILLISECONDS)) { + LOG.warn("Unable to acquire the lock in " + timeoutMs + + " milliseconds."); + try { + ZKUtil.deleteNode(zkWatcher, createdZNode); + } catch (KeeperException e) { + LOG.warn("Unable to remove ZNode " + createdZNode); + } + return false; + } + } else { + deletedLatch.await(); + } + if (deletionListener.hasException()) { + Throwable t = deletionListener.getException(); + throw new IOException("Exception in the watcher", t); + } + } + } catch (KeeperException e) { + throw new IOException("Unexpected ZooKeeper exception", e); + } finally { + zkWatcher.unregisterListener(deletionListener); + } + } + updateAcquiredLock(createdZNode); + LOG.debug("Successfully acquired a lock for " + createdZNode); + return true; + } + + private String createLockZNode() { + try { + return ZKUtil.createNodeIfNotExistsNoWatch(zkWatcher, fullyQualifiedZNode, + metadata, CreateMode.EPHEMERAL_SEQUENTIAL); + } catch (KeeperException ex) { + LOG.warn("Failed to create znode: " + fullyQualifiedZNode, ex); + return null; + } + } + + /** + * Check if a child znode represents a write lock. + * @param child The child znode we want to check. + * @return whether the child znode represents a write lock + */ + protected static boolean isChildWriteLock(String child) { + int idx = child.lastIndexOf(ZKUtil.ZNODE_PATH_SEPARATOR); + String suffix = child.substring(idx + 1); + return suffix.startsWith(WRITE_LOCK_CHILD_NODE_PREFIX); + } + + /** + * Update state as to indicate that a lock is held + * @param createdZNode The lock znode + * @throws IOException If an unrecoverable ZooKeeper error occurs + */ + protected void updateAcquiredLock(String createdZNode) throws IOException { + Stat stat = new Stat(); + byte[] data = null; + Exception ex = null; + try { + data = ZKUtil.getDataNoWatch(zkWatcher, createdZNode, stat); + } catch (KeeperException e) { + LOG.warn("Cannot getData for znode:" + createdZNode, e); + ex = e; + } + if (data == null) { + LOG.error("Can't acquire a lock on a non-existent node " + createdZNode); + throw new IllegalStateException("ZNode " + createdZNode + + "no longer exists!", ex); + } + AcquiredLock newLock = new AcquiredLock(createdZNode, stat.getVersion()); + if (!acquiredLock.compareAndSet(null, newLock)) { + LOG.error("The lock " + fullyQualifiedZNode + + " has already been acquired by another process!"); + throw new IllegalStateException(fullyQualifiedZNode + + " is held by another process"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void release() throws IOException, InterruptedException { + AcquiredLock lock = acquiredLock.get(); + if (lock == null) { + LOG.error("Cannot release lock" + + ", process does not have a lock for " + fullyQualifiedZNode); + throw new IllegalStateException("No lock held for " + fullyQualifiedZNode); + } + try { + if (ZKUtil.checkExists(zkWatcher, lock.getPath()) != -1) { + ZKUtil.deleteNode(zkWatcher, lock.getPath(), lock.getVersion()); + if (!acquiredLock.compareAndSet(lock, null)) { + LOG.debug("Current process no longer holds " + lock + " for " + + fullyQualifiedZNode); + throw new IllegalStateException("Not holding a lock for " + + fullyQualifiedZNode +"!"); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Successfully released " + lock.getPath()); + } + } catch (BadVersionException e) { + throw new IllegalStateException(e); + } catch (KeeperException e) { + throw new IOException(e); + } + } + + /** + * Process metadata stored in a ZNode using a callback object passed to + * this instance. + *

+ * @param lockZNode The node holding the metadata + * @return True if metadata was ready and processed + * @throws IOException If an unexpected ZooKeeper error occurs + * @throws InterruptedException If interrupted when reading the metadata + */ + protected boolean handleLockMetadata(String lockZNode) + throws IOException, InterruptedException { + byte[] metadata = null; + try { + metadata = ZKUtil.getData(zkWatcher, lockZNode); + } catch (KeeperException ex) { + LOG.warn("Cannot getData for znode:" + lockZNode, ex); + } + if (metadata == null) { + return false; + } + if (handler != null) { + handler.handleMetadata(metadata); + } + return true; + } + + /** + * Determine based on a list of children under a ZNode, whether or not a + * process which created a specified ZNode has obtained a lock. If a lock is + * not obtained, return the path that we should watch awaiting its deletion. + * Otherwise, return null. + * This method is abstract as the logic for determining whether or not a + * lock is obtained depends on the type of lock being implemented. + * @param myZNode The ZNode created by the process attempting to acquire + * a lock + * @param children List of all child ZNodes under the lock's parent ZNode + * @return The path to watch, or null if myZNode can represent a correctly + * acquired lock. + */ + protected abstract String getLockPath(String myZNode, List children) + throws IOException, InterruptedException; +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessReadLock.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessReadLock.java new file mode 100644 index 0000000..87aa152 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessReadLock.java @@ -0,0 +1,83 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper.lock; + +import java.io.IOException; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + +/** + * ZooKeeper based read lock: does not exclude other read locks, but excludes + * and is excluded by write locks. + */ +@InterfaceAudience.Private +public class ZKInterProcessReadLock extends ZKInterProcessLockBase { + + private static final Log LOG = LogFactory.getLog(ZKInterProcessReadLock.class); + + public ZKInterProcessReadLock(ZooKeeperWatcher zooKeeperWatcher, + String znode, byte[] metadata, MetadataHandler handler) { + super(zooKeeperWatcher, znode, metadata, handler, READ_LOCK_CHILD_NODE_PREFIX); + } + + /** + * {@inheritDoc} + */ + @Override + protected String getLockPath(String createdZNode, List children) + throws IOException, InterruptedException { + TreeSet writeChildren = + new TreeSet(ZNodeComparator.COMPARATOR); + for (String child : children) { + if (isChildWriteLock(child)) { + writeChildren.add(child); + } + } + if (writeChildren.isEmpty()) { + return null; + } + SortedSet lowerChildren = writeChildren.headSet(createdZNode); + if (lowerChildren.isEmpty()) { + return null; + } + String pathToWatch = lowerChildren.last(); + String nodeHoldingLock = lowerChildren.first(); + String znode = ZKUtil.joinZNode(parentLockNode, nodeHoldingLock); + try { + handleLockMetadata(znode); + } catch (IOException e) { + LOG.warn("Error processing lock metadata in " + nodeHoldingLock, e); + } + return pathToWatch; + } + + @Override + public void reapAllLocks() throws IOException { + throw new UnsupportedOperationException( + "Lock reaping is not supported for ZK based read locks"); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessReadWriteLock.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessReadWriteLock.java new file mode 100644 index 0000000..425b3e1 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessReadWriteLock.java @@ -0,0 +1,66 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper.lock; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler; +import org.apache.hadoop.hbase.InterProcessReadWriteLock; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + +/** + * ZooKeeper based implementation of {@link InterProcessReadWriteLock}. This lock is fair, + * not reentrant, and not revocable. + */ +@InterfaceAudience.Private +public class ZKInterProcessReadWriteLock implements InterProcessReadWriteLock { + + private final ZooKeeperWatcher zkWatcher; + private final String znode; + private final MetadataHandler handler; + + /** + * Creates a DistributedReadWriteLock instance. + * @param zkWatcher + * @param znode ZNode path for the lock + * @param handler An object that will handle de-serializing and processing + * the metadata associated with reader or writer locks + * created by this object or null if none desired. + */ + public ZKInterProcessReadWriteLock(ZooKeeperWatcher zkWatcher, String znode, + MetadataHandler handler) { + this.zkWatcher = zkWatcher; + this.znode = znode; + this.handler = handler; + } + + /** + * {@inheritDoc} + */ + public ZKInterProcessReadLock readLock(byte[] metadata) { + return new ZKInterProcessReadLock(zkWatcher, znode, metadata, handler); + } + + /** + * {@inheritDoc} + */ + public ZKInterProcessWriteLock writeLock(byte[] metadata) { + return new ZKInterProcessWriteLock(zkWatcher, znode, metadata, handler); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessWriteLock.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessWriteLock.java new file mode 100644 index 0000000..0a74f30 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessWriteLock.java @@ -0,0 +1,98 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper.lock; + +import java.io.IOException; +import java.util.List; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * ZooKeeper based write lock: + */ +@InterfaceAudience.Private +public class ZKInterProcessWriteLock extends ZKInterProcessLockBase { + + private static final Log LOG = LogFactory.getLog(ZKInterProcessWriteLock.class); + + public ZKInterProcessWriteLock(ZooKeeperWatcher zooKeeperWatcher, + String znode, byte[] metadata, MetadataHandler handler) { + super(zooKeeperWatcher, znode, metadata, handler, WRITE_LOCK_CHILD_NODE_PREFIX); + } + + /** + * {@inheritDoc} + */ + @Override + protected String getLockPath(String createdZNode, List children) + throws IOException, InterruptedException { + TreeSet sortedChildren = + new TreeSet(ZNodeComparator.COMPARATOR); + sortedChildren.addAll(children); + String pathToWatch = sortedChildren.lower(createdZNode); + if (pathToWatch != null) { + String nodeHoldingLock = sortedChildren.first(); + String znode = ZKUtil.joinZNode(parentLockNode, nodeHoldingLock); + try { + handleLockMetadata(znode); + } catch (IOException e) { + LOG.warn("Error processing lock metadata in " + nodeHoldingLock, e); + } + } + return pathToWatch; + } + + /** + * Referred in zk recipe as "Revocable Shared Locks with Freaking Laser Beams" + * (http://zookeeper.apache.org/doc/trunk/recipes.html). + */ + public void reapAllLocks() throws IOException { + List children; + try { + children = ZKUtil.listChildrenNoWatch(zkWatcher, parentLockNode); + } catch (KeeperException e) { + LOG.error("Unexpected ZooKeeper error when listing children", e); + throw new IOException("Unexpected ZooKeeper exception", e); + } + + KeeperException deferred = null; + for (String child : children) { + if (isChildWriteLock(child)) { + String znode = ZKUtil.joinZNode(parentLockNode, child); + LOG.info("Reaping write lock for znode:" + znode); + try { + ZKUtil.deleteNodeFailSilent(zkWatcher, znode); + } catch (KeeperException ex) { + LOG.warn("Error reaping the znode for write lock :" + znode); + deferred = ex; + } + } + } + if (deferred != null) { + throw new IOException("ZK exception while reaping locks:", deferred); + } + } +} diff --git hbase-server/src/main/resources/hbase-default.xml hbase-server/src/main/resources/hbase-default.xml index ab152be..e033d3b 100644 --- hbase-server/src/main/resources/hbase-default.xml +++ hbase-server/src/main/resources/hbase-default.xml @@ -822,6 +822,15 @@ + hbase.table.lock.enable + true + + Set to true to enable locking the table in zookeeper for schema change operations. + Table locking from master prevents concurrent schema modifications to corrupt table + state. + + + dfs.support.append true Does HDFS allow appends to files? diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java hbase-server/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java index 8758f9e..f333a51 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java @@ -18,8 +18,11 @@ */ package org.apache.hadoop.hbase; -import java.util.Set; import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -91,7 +94,7 @@ public abstract class MultithreadedTestUtil { stopped = s; } } - + public void stop() throws Exception { synchronized (this) { stopped = true; @@ -130,7 +133,7 @@ public abstract class MultithreadedTestUtil { this.stopped = true; } } - + /** * A test thread that performs a repeating operation. */ @@ -138,13 +141,48 @@ public abstract class MultithreadedTestUtil { public RepeatingTestThread(TestContext ctx) { super(ctx); } - + public final void doWork() throws Exception { while (ctx.shouldRun() && !stopped) { doAnAction(); } } - + public abstract void doAnAction() throws Exception; } + + /** + * Verify that no assertions have failed inside a future. + * Used for unit tests that spawn threads. E.g., + *

+ * + * List> results = Lists.newArrayList(); + * Future f = executor.submit(new Callable { + * public Void call() { + * assertTrue(someMethod()); + * } + * }); + * results.add(f); + * assertOnFutures(results); + * + * @param threadResults A list of futures + * @param + * @throws InterruptedException If interrupted when waiting for a result + * from one of the futures + * @throws ExecutionException If an exception other than AssertionError + * occurs inside any of the futures + */ + public static void assertOnFutures(List> threadResults) + throws InterruptedException, ExecutionException { + for (Future threadResult : threadResults) { + try { + threadResult.get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof AssertionError) { + throw (AssertionError) e.getCause(); + } + throw e; + } + } + } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java index 17d0f67..82299a2 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager; import org.apache.hadoop.hbase.master.balancer.DefaultLoadBalancer; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.handler.EnableTableHandler; @@ -182,7 +183,7 @@ public class TestAssignmentManager { * @throws IOException * @throws KeeperException * @throws InterruptedException - * @throws DeserializationException + * @throws DeserializationException */ @Test(timeout = 5000) public void testBalanceOnMasterFailoverScenarioWithOpenedNode() @@ -346,7 +347,7 @@ public class TestAssignmentManager { * from one server to another mocking regionserver responding over zk. * @throws IOException * @throws KeeperException - * @throws DeserializationException + * @throws DeserializationException */ @Test public void testBalance() @@ -361,7 +362,7 @@ public class TestAssignmentManager { .getConfiguration()); // Create an AM. AssignmentManager am = new AssignmentManager(this.server, - this.serverManager, ct, balancer, executor, null); + this.serverManager, ct, balancer, executor, null, master.getTableLockManager()); am.failoverCleanupDone.set(true); try { // Make sure our new AM gets callbacks; once registered, can't unregister. @@ -442,7 +443,7 @@ public class TestAssignmentManager { * To test closed region handler to remove rit and delete corresponding znode * if region in pending close or closing while processing shutdown of a region * server.(HBASE-5927). - * + * * @throws KeeperException * @throws IOException * @throws ServiceException @@ -453,12 +454,12 @@ public class TestAssignmentManager { testCaseWithPartiallyDisabledState(Table.State.DISABLING); testCaseWithPartiallyDisabledState(Table.State.DISABLED); } - - + + /** * To test if the split region is removed from RIT if the region was in SPLITTING state but the RS * has actually completed the splitting in META but went down. See HBASE-6070 and also HBASE-5806 - * + * * @throws KeeperException * @throws IOException */ @@ -469,7 +470,7 @@ public class TestAssignmentManager { // false indicate the region is not split testCaseWithSplitRegionPartial(false); } - + private void testCaseWithSplitRegionPartial(boolean regionSplitDone) throws KeeperException, IOException, NodeExistsException, InterruptedException, ServiceException { // Create and startup an executor. This is used by AssignmentManager @@ -530,7 +531,7 @@ public class TestAssignmentManager { // Create an AM. AssignmentManager am = new AssignmentManager(this.server, - this.serverManager, ct, balancer, executor, null); + this.serverManager, ct, balancer, executor, null, master.getTableLockManager()); // adding region to regions and servers maps. am.regionOnline(REGIONINFO, SERVERNAME_A); // adding region in pending close. @@ -651,7 +652,7 @@ public class TestAssignmentManager { .getConfiguration()); // Create an AM. AssignmentManager am = new AssignmentManager(this.server, - this.serverManager, ct, balancer, null, null); + this.serverManager, ct, balancer, null, null, master.getTableLockManager()); try { // First make sure my mock up basically works. Unassign a region. unassign(am, SERVERNAME_A, hri); @@ -679,7 +680,7 @@ public class TestAssignmentManager { * Tests the processDeadServersAndRegionsInTransition should not fail with NPE * when it failed to get the children. Let's abort the system in this * situation - * @throws ServiceException + * @throws ServiceException */ @Test(timeout = 5000) public void testProcessDeadServersAndRegionsInTransitionShouldNotFailWithNPE() @@ -708,7 +709,7 @@ public class TestAssignmentManager { } } /** - * TestCase verifies that the regionPlan is updated whenever a region fails to open + * TestCase verifies that the regionPlan is updated whenever a region fails to open * and the master tries to process RS_ZK_FAILED_OPEN state.(HBASE-5546). */ @Test(timeout = 5000) @@ -795,7 +796,7 @@ public class TestAssignmentManager { this.gate.set(true); return randomServerName; } - + @Override public Map> retainAssignment( Map regions, List servers) { @@ -836,7 +837,7 @@ public class TestAssignmentManager { /** * Test verifies whether assignment is skipped for regions of tables in DISABLING state during * clean cluster startup. See HBASE-6281. - * + * * @throws KeeperException * @throws IOException * @throws Exception @@ -882,7 +883,7 @@ public class TestAssignmentManager { /** * Test verifies whether all the enabling table regions assigned only once during master startup. - * + * * @throws KeeperException * @throws IOException * @throws Exception @@ -902,7 +903,8 @@ public class TestAssignmentManager { try { // set table in enabling state. am.getZKTable().setEnablingTable(REGIONINFO.getTableNameAsString()); - new EnableTableHandler(server, REGIONINFO.getTableName(), am.getCatalogTracker(), am, true) + new EnableTableHandler(server, REGIONINFO.getTableName(), am.getCatalogTracker(), + am, new NullTableLockManager(), true).prepare() .process(); assertEquals("Number of assignments should be 1.", 1, assignmentCount); assertTrue("Table should be enabled.", @@ -1071,7 +1073,7 @@ public class TestAssignmentManager { ExecutorService executor = startupMasterExecutor("mockedAMExecutor"); this.balancer = LoadBalancerFactory.getLoadBalancer(server.getConfiguration()); AssignmentManagerWithExtrasForTesting am = new AssignmentManagerWithExtrasForTesting( - server, manager, ct, this.balancer, executor); + server, manager, ct, this.balancer, executor, new NullTableLockManager()); return am; } @@ -1090,8 +1092,9 @@ public class TestAssignmentManager { public AssignmentManagerWithExtrasForTesting( final Server master, final ServerManager serverManager, final CatalogTracker catalogTracker, final LoadBalancer balancer, - final ExecutorService service) throws KeeperException, IOException { - super(master, serverManager, catalogTracker, balancer, service, null); + final ExecutorService service, final TableLockManager tableLockManager) + throws KeeperException, IOException { + super(master, serverManager, catalogTracker, balancer, service, null, tableLockManager); this.es = service; this.ct = catalogTracker; } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java index 428bd99..0465a9f 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java @@ -315,6 +315,11 @@ public class TestCatalogJanitor { @Override public void deleteColumn(byte[] tableName, byte[] columnName) throws IOException { } + + @Override + public TableLockManager getTableLockManager() { + return null; + } } @Test diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java new file mode 100644 index 0000000..c8316f4 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java @@ -0,0 +1,294 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.LockTimeoutException; +import org.apache.hadoop.hbase.TableNotDisabledException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.After; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Tests the default table lock manager + */ +@Category(MediumTests.class) +public class TestTableLockManager { + + private static final Log LOG = + LogFactory.getLog(TestTableLockManager.class); + + private static final byte[] TABLE_NAME = Bytes.toBytes("TestTableLevelLocks"); + + private static final byte[] FAMILY = Bytes.toBytes("f1"); + + private static final byte[] NEW_FAMILY = Bytes.toBytes("f2"); + + private final HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + + private static final CountDownLatch deleteColumn = new CountDownLatch(1); + private static final CountDownLatch addColumn = new CountDownLatch(1); + + public void prepareMiniCluster() throws Exception { + TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true); + TEST_UTIL.startMiniCluster(2); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + } + + public void prepareMiniZkCluster() throws Exception { + TEST_UTIL.startMiniZKCluster(1); + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test(timeout = 600000) + public void testLockTimeoutException() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(TableLockManager.TABLE_WRITE_LOCK_TIMEOUT_MS, 3000); + prepareMiniCluster(); + HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); + master.getCoprocessorHost().load(TestLockTimeoutExceptionMasterObserver.class, + 0, TEST_UTIL.getConfiguration()); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future shouldFinish = executor.submit(new Callable() { + @Override + public Object call() throws Exception { + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + admin.deleteColumn(TABLE_NAME, FAMILY); + return null; + } + }); + + deleteColumn.await(); + + try { + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + admin.addColumn(TABLE_NAME, new HColumnDescriptor(NEW_FAMILY)); + fail("Was expecting TableLockTimeoutException"); + } catch (LockTimeoutException ex) { + //expected + } + shouldFinish.get(); + } + + public static class TestLockTimeoutExceptionMasterObserver extends BaseMasterObserver { + @Override + public void preDeleteColumnHandler(ObserverContext ctx, + byte[] tableName, byte[] c) throws IOException { + deleteColumn.countDown(); + } + @Override + public void postDeleteColumnHandler(ObserverContext ctx, + byte[] tableName, byte[] c) throws IOException { + Threads.sleep(10000); + } + + @Override + public void preAddColumnHandler(ObserverContext ctx, + byte[] tableName, HColumnDescriptor column) throws IOException { + fail("Add column should have timeouted out for acquiring the table lock"); + } + } + + @Test(timeout = 600000) + public void testAlterAndDisable() throws Exception { + prepareMiniCluster(); + // Send a request to alter a table, then sleep during + // the alteration phase. In the mean time, from another + // thread, send a request to disable, and then delete a table. + + HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); + master.getCoprocessorHost().load(TestAlterAndDisableMasterObserver.class, + 0, TEST_UTIL.getConfiguration()); + + ExecutorService executor = Executors.newFixedThreadPool(2); + Future alterTableFuture = executor.submit(new Callable() { + @Override + public Object call() throws Exception { + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + admin.addColumn(TABLE_NAME, new HColumnDescriptor(NEW_FAMILY)); + LOG.info("Added new column family"); + HTableDescriptor tableDesc = admin.getTableDescriptor(TABLE_NAME); + assertTrue(tableDesc.getFamiliesKeys().contains(NEW_FAMILY)); + return null; + } + }); + Future disableTableFuture = executor.submit(new Callable() { + @Override + public Object call() throws Exception { + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + admin.disableTable(TABLE_NAME); + assertTrue(admin.isTableDisabled(TABLE_NAME)); + admin.deleteTable(TABLE_NAME); + assertFalse(admin.tableExists(TABLE_NAME)); + return null; + } + }); + + try { + disableTableFuture.get(); + alterTableFuture.get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof AssertionError) { + throw (AssertionError) e.getCause(); + } + throw e; + } + } + + public static class TestAlterAndDisableMasterObserver extends BaseMasterObserver { + @Override + public void preAddColumnHandler(ObserverContext ctx, + byte[] tableName, HColumnDescriptor column) throws IOException { + LOG.debug("addColumn called"); + addColumn.countDown(); + } + + @Override + public void postAddColumnHandler(ObserverContext ctx, + byte[] tableName, HColumnDescriptor column) throws IOException { + Threads.sleep(6000); + try { + ctx.getEnvironment().getMasterServices().checkTableModifiable(tableName); + } catch(TableNotDisabledException expected) { + //pass + return; + } catch(IOException ex) { + } + fail("was expecting the table to be enabled"); + } + + @Override + public void preDisableTable(ObserverContext ctx, + byte[] tableName) throws IOException { + try { + LOG.debug("Waiting for addColumn to be processed first"); + //wait for addColumn to be processed first + addColumn.await(); + LOG.debug("addColumn started, we can continue"); + } catch (InterruptedException ex) { + LOG.warn("Sleep interrupted while waiting for addColumn countdown"); + } + } + + @Override + public void postDisableTableHandler(ObserverContext ctx, + byte[] tableName) throws IOException { + Threads.sleep(3000); + } + } + + @Test(timeout = 600000) + public void testDelete() throws Exception { + prepareMiniCluster(); + + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + admin.disableTable(TABLE_NAME); + admin.deleteTable(TABLE_NAME); + + //ensure that znode for the table node has been deleted + ZooKeeperWatcher zkWatcher = TEST_UTIL.getZooKeeperWatcher(); + + assertTrue(ZKUtil.checkExists(zkWatcher, + ZKUtil.joinZNode(zkWatcher.tableLockZNode, Bytes.toString(TABLE_NAME))) < 0); + + } + + + @Test(timeout = 600000) + public void testReapAllTableLocks() throws Exception { + prepareMiniZkCluster(); + ServerName serverName = new ServerName("localhost:10000", 0); + final TableLockManager lockManager = TableLockManager.createTableLockManager( + TEST_UTIL.getConfiguration(), TEST_UTIL.getZooKeeperWatcher(), serverName); + + String tables[] = {"table1", "table2", "table3", "table4"}; + ExecutorService executor = Executors.newFixedThreadPool(6); + + final CountDownLatch writeLocksObtained = new CountDownLatch(4); + final CountDownLatch writeLocksAttempted = new CountDownLatch(10); + //TODO: read lock tables + + //6 threads will be stuck waiting for the table lock + for (int i = 0; i < tables.length; i++) { + final String table = tables[i]; + for (int j = 0; j < i+1; j++) { //i+1 write locks attempted for table[i] + executor.submit(new Callable() { + @Override + public Void call() throws Exception { + writeLocksAttempted.countDown(); + lockManager.writeLock(Bytes.toBytes(table), "testReapAllTableLocks").acquire(); + writeLocksObtained.countDown(); + return null; + } + }); + } + } + + writeLocksObtained.await(); + writeLocksAttempted.await(); + + //now reap all table locks + lockManager.reapAllTableWriteLocks(); + + TEST_UTIL.getConfiguration().setInt(TableLockManager.TABLE_WRITE_LOCK_TIMEOUT_MS, 0); + TableLockManager zeroTimeoutLockManager = TableLockManager.createTableLockManager( + TEST_UTIL.getConfiguration(), TEST_UTIL.getZooKeeperWatcher(), serverName); + + //should not throw table lock timeout exception + zeroTimeoutLockManager.writeLock(Bytes.toBytes(tables[tables.length -1]), "zero timeout") + .acquire(); + + executor.shutdownNow(); + } + +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/lock/TestZKInterProcessReadWriteLock.java hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/lock/TestZKInterProcessReadWriteLock.java new file mode 100644 index 0000000..bea2b83 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/lock/TestZKInterProcessReadWriteLock.java @@ -0,0 +1,359 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper.lock; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DaemonThreadFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.InterProcessLock; +import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.MultithreadedTestUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + +@Category(MediumTests.class) +public class TestZKInterProcessReadWriteLock { + + private static final Log LOG = + LogFactory.getLog(TestZKInterProcessReadWriteLock.class); + + private static final HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + + private static final int NUM_THREADS = 10; + + private static Configuration conf; + + private final AtomicBoolean isLockHeld = new AtomicBoolean(false); + private final ExecutorService executor = + Executors.newFixedThreadPool(NUM_THREADS, + new DaemonThreadFactory("TestZKInterProcessReadWriteLock-")); + + @BeforeClass + public static void beforeAllTests() throws Exception { + conf = TEST_UTIL.getConfiguration(); + TEST_UTIL.startMiniZKCluster(); + conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 1000); + ZooKeeperWatcher zkw = getZooKeeperWatcher("setup"); + ZKUtil.createWithParents(zkw, zkw.tableLockZNode); + } + + @AfterClass + public static void afterAllTests() throws Exception { + TEST_UTIL.shutdownMiniZKCluster(); + } + + @After + public void tearDown() { + executor.shutdown(); + } + + private static ZooKeeperWatcher getZooKeeperWatcher(String desc) + throws IOException { + return TEST_UTIL.getZooKeeperWatcher(); + } + + + @Test(timeout = 30000) + public void testWriteLockExcludesWriters() throws Exception { + final String testName = "testWriteLockExcludesWriters"; + final ZKInterProcessReadWriteLock readWriteLock = + getReadWriteLock(testName); + List> results = Lists.newArrayList(); + for (int i = 0; i < NUM_THREADS; ++i) { + final String threadDesc = testName + i; + results.add(executor.submit(new Callable() { + @Override + public Void call() throws IOException { + ZKInterProcessWriteLock writeLock = + readWriteLock.writeLock(Bytes.toBytes(threadDesc)); + try { + writeLock.acquire(); + try { + // No one else should hold the lock + assertTrue(isLockHeld.compareAndSet(false, true)); + Thread.sleep(1000); + // No one else should have released the lock + assertTrue(isLockHeld.compareAndSet(true, false)); + } finally { + isLockHeld.set(false); + writeLock.release(); + } + } catch (InterruptedException e) { + LOG.warn(threadDesc + " interrupted", e); + Thread.currentThread().interrupt(); + throw new InterruptedIOException(); + } + return null; + } + })); + + } + MultithreadedTestUtil.assertOnFutures(results); + } + + @Test(timeout = 30000) + public void testReadLockDoesNotExcludeReaders() throws Exception { + final String testName = "testReadLockDoesNotExcludeReaders"; + final ZKInterProcessReadWriteLock readWriteLock = + getReadWriteLock(testName); + final CountDownLatch locksAcquiredLatch = new CountDownLatch(NUM_THREADS); + final AtomicInteger locksHeld = new AtomicInteger(0); + List> results = Lists.newArrayList(); + for (int i = 0; i < NUM_THREADS; ++i) { + final String threadDesc = testName + i; + results.add(executor.submit(new Callable() { + @Override + public Void call() throws Exception { + ZKInterProcessReadLock readLock = + readWriteLock.readLock(Bytes.toBytes(threadDesc)); + readLock.acquire(); + try { + locksHeld.incrementAndGet(); + locksAcquiredLatch.countDown(); + Thread.sleep(1000); + } finally { + readLock.release(); + locksHeld.decrementAndGet(); + } + return null; + } + })); + } + locksAcquiredLatch.await(); + assertEquals(locksHeld.get(), NUM_THREADS); + MultithreadedTestUtil.assertOnFutures(results); + } + + @Test(timeout = 3000) + public void testReadLockExcludesWriters() throws Exception { + // Submit a read lock request first + // Submit a write lock request second + final String testName = "testReadLockExcludesWriters"; + List> results = Lists.newArrayList(); + final CountDownLatch readLockAcquiredLatch = new CountDownLatch(1); + Callable acquireReadLock = new Callable() { + @Override + public Void call() throws Exception { + final String threadDesc = testName + "-acquireReadLock"; + ZKInterProcessReadLock readLock = + getReadWriteLock(testName).readLock(Bytes.toBytes(threadDesc)); + readLock.acquire(); + try { + assertTrue(isLockHeld.compareAndSet(false, true)); + readLockAcquiredLatch.countDown(); + Thread.sleep(1000); + } finally { + isLockHeld.set(false); + readLock.release(); + } + return null; + } + }; + Callable acquireWriteLock = new Callable() { + @Override + public Void call() throws Exception { + final String threadDesc = testName + "-acquireWriteLock"; + ZKInterProcessWriteLock writeLock = + getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc)); + readLockAcquiredLatch.await(); + assertTrue(isLockHeld.get()); + writeLock.acquire(); + try { + assertFalse(isLockHeld.get()); + } finally { + writeLock.release(); + } + return null; + } + }; + results.add(executor.submit(acquireReadLock)); + results.add(executor.submit(acquireWriteLock)); + MultithreadedTestUtil.assertOnFutures(results); + } + + private static ZKInterProcessReadWriteLock getReadWriteLock(String testName) + throws IOException { + MetadataHandler handler = new MetadataHandler() { + @Override + public void handleMetadata(byte[] ownerMetadata) { + LOG.info("Lock info: " + Bytes.toString(ownerMetadata)); + } + }; + ZooKeeperWatcher zkWatcher = getZooKeeperWatcher(testName); + String znode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, testName); + + return new ZKInterProcessReadWriteLock(zkWatcher, znode, handler); + } + + @Test(timeout = 30000) + public void testWriteLockExcludesReaders() throws Exception { + // Submit a read lock request first + // Submit a write lock request second + final String testName = "testReadLockExcludesWriters"; + List> results = Lists.newArrayList(); + final CountDownLatch writeLockAcquiredLatch = new CountDownLatch(1); + Callable acquireWriteLock = new Callable() { + @Override + public Void call() throws Exception { + final String threadDesc = testName + "-acquireWriteLock"; + ZKInterProcessWriteLock writeLock = + getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc)); + writeLock.acquire(); + try { + writeLockAcquiredLatch.countDown(); + assertTrue(isLockHeld.compareAndSet(false, true)); + Thread.sleep(1000); + } finally { + isLockHeld.set(false); + writeLock.release(); + } + return null; + } + }; + Callable acquireReadLock = new Callable() { + @Override + public Void call() throws Exception { + final String threadDesc = testName + "-acquireReadLock"; + ZKInterProcessReadLock readLock = + getReadWriteLock(testName).readLock(Bytes.toBytes(threadDesc)); + writeLockAcquiredLatch.await(); + readLock.acquire(); + try { + assertFalse(isLockHeld.get()); + } finally { + readLock.release(); + } + return null; + } + }; + results.add(executor.submit(acquireWriteLock)); + results.add(executor.submit(acquireReadLock)); + MultithreadedTestUtil.assertOnFutures(results); + } + + @Test(timeout = 60000) + public void testTimeout() throws Exception { + final String testName = "testTimeout"; + final CountDownLatch lockAcquiredLatch = new CountDownLatch(1); + Callable shouldHog = new Callable() { + @Override + public Void call() throws Exception { + final String threadDesc = testName + "-shouldHog"; + ZKInterProcessWriteLock lock = + getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc)); + lock.acquire(); + lockAcquiredLatch.countDown(); + Thread.sleep(10000); + lock.release(); + return null; + } + }; + Callable shouldTimeout = new Callable() { + @Override + public Void call() throws Exception { + final String threadDesc = testName + "-shouldTimeout"; + ZKInterProcessWriteLock lock = + getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc)); + lockAcquiredLatch.await(); + assertFalse(lock.tryAcquire(5000)); + return null; + } + }; + Callable shouldAcquireLock = new Callable() { + @Override + public Void call() throws Exception { + final String threadDesc = testName + "-shouldAcquireLock"; + ZKInterProcessWriteLock lock = + getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc)); + lockAcquiredLatch.await(); + assertTrue(lock.tryAcquire(30000)); + lock.release(); + return null; + } + }; + List> results = Lists.newArrayList(); + results.add(executor.submit(shouldHog)); + results.add(executor.submit(shouldTimeout)); + results.add(executor.submit(shouldAcquireLock)); + MultithreadedTestUtil.assertOnFutures(results); + } + + @Test(timeout = 60000) + public void testMultipleClients() throws Exception { + //tests lock usage from multiple zookeeper clients with different sessions. + //acquire one read lock, then one write lock + final String testName = "testMultipleClients"; + + //different zookeeper sessions with separate identifiers + ZooKeeperWatcher zkWatcher1 = new ZooKeeperWatcher(conf, "testMultipleClients-1", null); + ZooKeeperWatcher zkWatcher2 = new ZooKeeperWatcher(conf, "testMultipleClients-2", null); + + String znode = ZKUtil.joinZNode(zkWatcher1.tableLockZNode, testName); + + ZKInterProcessReadWriteLock clientLock1 + = new ZKInterProcessReadWriteLock(zkWatcher1, znode, null); + ZKInterProcessReadWriteLock clientLock2 + = new ZKInterProcessReadWriteLock(zkWatcher2, znode, null); + + InterProcessLock lock1 = clientLock1.readLock(Bytes.toBytes("client1")); + lock1.acquire(); + + //try to acquire, but it will timeout. We are testing whether this will cause any problems + //due to the read lock being from another client + InterProcessLock lock2 = clientLock2.writeLock(Bytes.toBytes("client2")); + assertFalse(lock2.tryAcquire(1000)); + + lock1.release(); + + //this time it will acquire + assertTrue(lock2.tryAcquire(5000)); + lock2.release(); + zkWatcher1.close(); + zkWatcher2.close(); + } +}