Index: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (revision 1377146) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (working copy) @@ -111,10 +111,9 @@ Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1")); ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state"); - ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", - Bytes.toBytes(ReplicationZookeeper.PeerState.ENABLED.name())); + ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", ReplicationZookeeper.ENABLED_ZNODE_BYTES); ZKUtil.createWithParents(zkw, "/hbase/replication/state"); - ZKUtil.setData(zkw, "/hbase/replication/state", Bytes.toBytes("true")); + ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationZookeeper.ENABLED_ZNODE_BYTES); replication = new Replication(new DummyServer(), fs, logDir, oldLogDir); manager = replication.getReplicationManager(); @@ -135,8 +134,6 @@ htd.addFamily(col); hri = new HRegionInfo(htd.getName(), r1, r2); - - } @AfterClass Index: hbase-server/src/main/protobuf/ZooKeeper.proto =================================================================== --- hbase-server/src/main/protobuf/ZooKeeper.proto (revision 1377146) +++ hbase-server/src/main/protobuf/ZooKeeper.proto (working copy) @@ -98,3 +98,24 @@ // for more. required State state = 1 [default = ENABLED]; } + + +/** + * Used by replication. Holds a replication peer key. + */ +message ReplicationPeer { + // clusterKey is the concatenation of the slave cluster's + // hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent + required string clusterkey = 1; +} + +/** + * Used by replication. Holds whether enabled or disabled + */ +message ReplicationState { + enum State { + ENABLED = 0; + DISABLED = 1; + } + required State state = 1; +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java (revision 1377146) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java (working copy) @@ -3230,6 +3230,879 @@ // @@protoc_insertion_point(class_scope:Table) } + public interface ReplicationPeerOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required string clusterkey = 1; + boolean hasClusterkey(); + String getClusterkey(); + } + public static final class ReplicationPeer extends + com.google.protobuf.GeneratedMessage + implements ReplicationPeerOrBuilder { + // Use ReplicationPeer.newBuilder() to construct. + private ReplicationPeer(Builder builder) { + super(builder); + } + private ReplicationPeer(boolean noInit) {} + + private static final ReplicationPeer defaultInstance; + public static ReplicationPeer getDefaultInstance() { + return defaultInstance; + } + + public ReplicationPeer getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_ReplicationPeer_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_ReplicationPeer_fieldAccessorTable; + } + + private int bitField0_; + // required string clusterkey = 1; + public static final int CLUSTERKEY_FIELD_NUMBER = 1; + private java.lang.Object clusterkey_; + public boolean hasClusterkey() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public String getClusterkey() { + java.lang.Object ref = clusterkey_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (com.google.protobuf.Internal.isValidUtf8(bs)) { + clusterkey_ = s; + } + return s; + } + } + private com.google.protobuf.ByteString getClusterkeyBytes() { + java.lang.Object ref = clusterkey_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((String) ref); + clusterkey_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + private void initFields() { + clusterkey_ = ""; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasClusterkey()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, getClusterkeyBytes()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(1, getClusterkeyBytes()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer other = (org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer) obj; + + boolean result = true; + result = result && (hasClusterkey() == other.hasClusterkey()); + if (hasClusterkey()) { + result = result && getClusterkey() + .equals(other.getClusterkey()); + } + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + @java.lang.Override + public int hashCode() { + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasClusterkey()) { + hash = (37 * hash) + CLUSTERKEY_FIELD_NUMBER; + hash = (53 * hash) + getClusterkey().hashCode(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + return hash; + } + + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeerOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_ReplicationPeer_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_ReplicationPeer_fieldAccessorTable; + } + + // Construct using org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder(BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + clusterkey_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer.getDescriptor(); + } + + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer getDefaultInstanceForType() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer build() { + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer buildPartial() { + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer result = new org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.clusterkey_ = clusterkey_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer) { + return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer other) { + if (other == org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer.getDefaultInstance()) return this; + if (other.hasClusterkey()) { + setClusterkey(other.getClusterkey()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasClusterkey()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + clusterkey_ = input.readBytes(); + break; + } + } + } + } + + private int bitField0_; + + // required string clusterkey = 1; + private java.lang.Object clusterkey_ = ""; + public boolean hasClusterkey() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public String getClusterkey() { + java.lang.Object ref = clusterkey_; + if (!(ref instanceof String)) { + String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); + clusterkey_ = s; + return s; + } else { + return (String) ref; + } + } + public Builder setClusterkey(String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + clusterkey_ = value; + onChanged(); + return this; + } + public Builder clearClusterkey() { + bitField0_ = (bitField0_ & ~0x00000001); + clusterkey_ = getDefaultInstance().getClusterkey(); + onChanged(); + return this; + } + void setClusterkey(com.google.protobuf.ByteString value) { + bitField0_ |= 0x00000001; + clusterkey_ = value; + onChanged(); + } + + // @@protoc_insertion_point(builder_scope:ReplicationPeer) + } + + static { + defaultInstance = new ReplicationPeer(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:ReplicationPeer) + } + + public interface ReplicationStateOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required .ReplicationState.State state = 1; + boolean hasState(); + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState.State getState(); + } + public static final class ReplicationState extends + com.google.protobuf.GeneratedMessage + implements ReplicationStateOrBuilder { + // Use ReplicationState.newBuilder() to construct. + private ReplicationState(Builder builder) { + super(builder); + } + private ReplicationState(boolean noInit) {} + + private static final ReplicationState defaultInstance; + public static ReplicationState getDefaultInstance() { + return defaultInstance; + } + + public ReplicationState getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_ReplicationState_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_ReplicationState_fieldAccessorTable; + } + + public enum State + implements com.google.protobuf.ProtocolMessageEnum { + ENABLED(0, 0), + DISABLED(1, 1), + ; + + public static final int ENABLED_VALUE = 0; + public static final int DISABLED_VALUE = 1; + + + public final int getNumber() { return value; } + + public static State valueOf(int value) { + switch (value) { + case 0: return ENABLED; + case 1: return DISABLED; + default: return null; + } + } + + public static com.google.protobuf.Internal.EnumLiteMap + internalGetValueMap() { + return internalValueMap; + } + private static com.google.protobuf.Internal.EnumLiteMap + internalValueMap = + new com.google.protobuf.Internal.EnumLiteMap() { + public State findValueByNumber(int number) { + return State.valueOf(number); + } + }; + + public final com.google.protobuf.Descriptors.EnumValueDescriptor + getValueDescriptor() { + return getDescriptor().getValues().get(index); + } + public final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptorForType() { + return getDescriptor(); + } + public static final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState.getDescriptor().getEnumTypes().get(0); + } + + private static final State[] VALUES = { + ENABLED, DISABLED, + }; + + public static State valueOf( + com.google.protobuf.Descriptors.EnumValueDescriptor desc) { + if (desc.getType() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "EnumValueDescriptor is not for this type."); + } + return VALUES[desc.getIndex()]; + } + + private final int index; + private final int value; + + private State(int index, int value) { + this.index = index; + this.value = value; + } + + // @@protoc_insertion_point(enum_scope:ReplicationState.State) + } + + private int bitField0_; + // required .ReplicationState.State state = 1; + public static final int STATE_FIELD_NUMBER = 1; + private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState.State state_; + public boolean hasState() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState.State getState() { + return state_; + } + + private void initFields() { + state_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState.State.ENABLED; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasState()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeEnum(1, state_.getNumber()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeEnumSize(1, state_.getNumber()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState other = (org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState) obj; + + boolean result = true; + result = result && (hasState() == other.hasState()); + if (hasState()) { + result = result && + (getState() == other.getState()); + } + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + @java.lang.Override + public int hashCode() { + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasState()) { + hash = (37 * hash) + STATE_FIELD_NUMBER; + hash = (53 * hash) + hashEnum(getState()); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + return hash; + } + + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationStateOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_ReplicationState_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_ReplicationState_fieldAccessorTable; + } + + // Construct using org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder(BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + state_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState.State.ENABLED; + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState.getDescriptor(); + } + + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState getDefaultInstanceForType() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState build() { + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState buildPartial() { + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState result = new org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.state_ = state_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState) { + return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState other) { + if (other == org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState.getDefaultInstance()) return this; + if (other.hasState()) { + setState(other.getState()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasState()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + } + break; + } + case 8: { + int rawValue = input.readEnum(); + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState.State value = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState.State.valueOf(rawValue); + if (value == null) { + unknownFields.mergeVarintField(1, rawValue); + } else { + bitField0_ |= 0x00000001; + state_ = value; + } + break; + } + } + } + } + + private int bitField0_; + + // required .ReplicationState.State state = 1; + private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState.State state_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState.State.ENABLED; + public boolean hasState() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState.State getState() { + return state_; + } + public Builder setState(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState.State value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + state_ = value; + onChanged(); + return this; + } + public Builder clearState() { + bitField0_ = (bitField0_ & ~0x00000001); + state_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState.State.ENABLED; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:ReplicationState) + } + + static { + defaultInstance = new ReplicationState(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:ReplicationState) + } + private static com.google.protobuf.Descriptors.Descriptor internal_static_RootRegionServer_descriptor; private static @@ -3260,6 +4133,16 @@ private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_Table_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_ReplicationPeer_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_ReplicationPeer_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_ReplicationState_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_ReplicationState_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -3282,9 +4165,13 @@ "\000\022\t\n\005OWNED\020\001\022\014\n\010RESIGNED\020\002\022\010\n\004DONE\020\003\022\007\n\003" + "ERR\020\004\"n\n\005Table\022$\n\005state\030\001 \002(\0162\014.Table.St" + "ate:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DI" + - "SABLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003BE\n" + - "*org.apache.hadoop.hbase.protobuf.genera" + - "tedB\017ZooKeeperProtosH\001\210\001\001\240\001\001" + "SABLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"%\n" + + "\017ReplicationPeer\022\022\n\nclusterkey\030\001 \002(\t\"^\n\020" + + "ReplicationState\022&\n\005state\030\001 \002(\0162\027.Replic" + + "ationState.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014" + + "\n\010DISABLED\020\001BE\n*org.apache.hadoop.hbase." + + "protobuf.generatedB\017ZooKeeperProtosH\001\210\001\001" + + "\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -3339,6 +4226,22 @@ new java.lang.String[] { "State", }, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table.class, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table.Builder.class); + internal_static_ReplicationPeer_descriptor = + getDescriptor().getMessageTypes().get(6); + internal_static_ReplicationPeer_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_ReplicationPeer_descriptor, + new java.lang.String[] { "Clusterkey", }, + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer.class, + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationPeer.Builder.class); + internal_static_ReplicationState_descriptor = + getDescriptor().getMessageTypes().get(7); + internal_static_ReplicationState_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_ReplicationState_descriptor, + new java.lang.String[] { "State", }, + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState.class, + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationState.Builder.class); return null; } }; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/FilterProtos.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/FilterProtos.java (revision 1377146) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/FilterProtos.java (working copy) @@ -1,5 +1,5 @@ // Generated by the protocol buffer compiler. DO NOT EDIT! -// source: filter.proto +// source: Filter.proto package org.apache.hadoop.hbase.protobuf.generated; @@ -11041,7 +11041,7 @@ descriptor; static { java.lang.String[] descriptorData = { - "\n\014filter.proto\032\013hbase.proto\"%\n\024ColumnCou" + + "\n\014Filter.proto\032\013hbase.proto\"%\n\024ColumnCou" + "ntGetFilter\022\r\n\005limit\030\001 \002(\005\"7\n\026ColumnPagi" + "nationFilter\022\r\n\005limit\030\001 \002(\005\022\016\n\006offset\030\002 " + "\001(\005\"$\n\022ColumnPrefixFilter\022\016\n\006prefix\030\001 \002(" + Index: hbase-server/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (revision 1377146) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (working copy) @@ -164,20 +164,6 @@ } /** - * Get state of the peer - * - * @param id peer's identifier - * @return current state of the peer - */ - public String getPeerState(String id) throws IOException { - try { - return this.replicationZk.getPeerState(id).name(); - } catch (KeeperException e) { - throw new IOException("Couldn't get the state of the peer " + id, e); - } - } - - /** * Get the current status of the kill switch, if the cluster is replicating * or not. * @return true if the cluster is replicated, otherwise false @@ -221,4 +207,4 @@ this.connection.close(); } } -} +} \ No newline at end of file Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (revision 1377146) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (working copy) @@ -30,9 +30,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.DeserializationException; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.replication.ReplicationZookeeper.PeerState; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -80,21 +79,19 @@ * @throws KeeperException */ public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode) - throws KeeperException { - if (ZKUtil.checkExists(zookeeper, peerStateNode) == -1) { - ZKUtil.createAndWatch(zookeeper, peerStateNode, - Bytes.toBytes(PeerState.ENABLED.name())); // enabled by default + throws KeeperException { + ReplicationZookeeper.ensurePeerEnabled(zookeeper, peerStateNode); + this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this); + this.peerStateTracker.start(); + try { + this.readPeerStateZnode(); + } catch (DeserializationException e) { + throw ZKUtil.convert(e); } - this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, - this); - this.peerStateTracker.start(); - this.readPeerStateZnode(); } - private void readPeerStateZnode() { - String currentState = Bytes.toString(peerStateTracker.getData(false)); - this.peerEnabled.set(PeerState.ENABLED.equals(PeerState - .valueOf(currentState))); + private void readPeerStateZnode() throws DeserializationException { + this.peerEnabled.set(ReplicationZookeeper.isPeerEnabled(this.peerStateTracker.getData(false))); } /** @@ -199,7 +196,11 @@ public synchronized void nodeDataChanged(String path) { if (path.equals(node)) { super.nodeDataChanged(path); - readPeerStateZnode(); + try { + readPeerStateZnode(); + } catch (DeserializationException e) { + LOG.warn("Failed deserializing the content of " + path, e); + } } } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (revision 1377146) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (working copy) @@ -478,7 +478,6 @@ * Watcher used to follow the creation and deletion of peer clusters. */ public class PeersWatcher extends ZooKeeperListener { - /** * Construct a ZooKeeper event listener. */ Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (revision 1377146) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (working copy) @@ -94,7 +94,7 @@ "(replicating=" + this.replicating, ke); } this.replicationManager = new ReplicationSourceManager(zkHelper, conf, - this.server, fs, this.replicating, logDir, oldLogDir) ; + this.server, fs, this.replicating, logDir, oldLogDir); } else { this.replicationManager = null; this.zkHelper = null; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1377146) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -37,9 +37,12 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.DeserializationException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; @@ -47,8 +50,11 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.ConnectionLossException; +import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.KeeperException.SessionExpiredException; +import com.google.protobuf.InvalidProtocolBufferException; + /** * This class serves as a helper for all things related to zookeeper in * replication. @@ -59,22 +65,14 @@ * *
  * replication/
- *  state      {contains true or false}
- *  clusterId  {contains a byte}
+ *  state      {contains ENABLED or DISABLED}
  *  peers/
- *    1/   {contains a full cluster address}
+ *    1/   {contains full cluster address which is ZK ensemble's addresses, client port and root znode}
  *      peer-state  {contains ENABLED or DISABLED}
  *    2/
  *    ...
  *  rs/ {lists all RS that replicate}
- *    startcode1/ {lists all peer clusters}
- *      1/ {lists hlogs to process}
- *        10.10.1.76%3A53488.123456789 {contains nothing or a position}
- *        10.10.1.76%3A53488.123456790
- *        ...
- *      2/
- *      ...
- *    startcode2/
+ *    1.example.org,1234,5678 {Full ServerName}
  *    ...
  * 
*/ @@ -85,11 +83,6 @@ // Name of znode we use to lock when failover private final static String RS_LOCK_ZNODE = "lock"; - // Values of znode which stores state of a peer - public static enum PeerState { - ENABLED, DISABLED - }; - // Our handle on zookeeper private final ZooKeeperWatcher zookeeper; // Map of peer clusters keyed by their id @@ -104,7 +97,8 @@ private String rsServerNameZnode; // Name node if the replicationState znode private String replicationStateNodeName; - // Name of zk node which stores peer state + // Name of zk node which stores peer state. The peer-state znode is under a peers' id node; + // e.g. /hbase/replication/peers/PEER_ID/peer-state private String peerStateNodeName; private final Configuration conf; // Is this cluster replicating at the moment? @@ -116,15 +110,27 @@ private ReplicationStatusTracker statusTracker; /** + * ZNode content if enabled state. + */ + // Public so can be seen by test code. + public static final byte [] ENABLED_ZNODE_BYTES = + toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED); + + /** + * ZNode content if disabled state. + */ + static final byte [] DISABLED_ZNODE_BYTES = + toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED); + + /** * Constructor used by clients of replication (like master and HBase clients) * @param conf conf to use * @param zk zk connection to use * @throws IOException */ public ReplicationZookeeper(final Abortable abortable, final Configuration conf, - final ZooKeeperWatcher zk) - throws KeeperException { - + final ZooKeeperWatcher zk) + throws KeeperException { this.conf = conf; this.zookeeper = zk; this.replicating = new AtomicBoolean(); @@ -149,34 +155,27 @@ this.peerClusters = new HashMap(); ZKUtil.createWithParents(this.zookeeper, - ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName)); + ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName)); this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName().toString()); ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode); connectExistingPeers(); } private void setZNodes(Abortable abortable) throws KeeperException { - String replicationZNodeName = - conf.get("zookeeper.znode.replication", "replication"); - String peersZNodeName = - conf.get("zookeeper.znode.replication.peers", "peers"); - this.peerStateNodeName = conf.get( - "zookeeper.znode.replication.peers.state", "peer-state"); - this.replicationStateNodeName = - conf.get("zookeeper.znode.replication.state", "state"); - String rsZNodeName = - conf.get("zookeeper.znode.replication.rs", "rs"); + String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); + String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); + this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state"); + this.replicationStateNodeName = conf.get("zookeeper.znode.replication.state", "state"); + String rsZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf); - this.replicationZNode = - ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName); + this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName); this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName); ZKUtil.createWithParents(this.zookeeper, this.peersZNode); this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName); ZKUtil.createWithParents(this.zookeeper, this.rsZNode); // Set a tracker on replicationStateNodeNode - this.statusTracker = - new ReplicationStatusTracker(this.zookeeper, abortable); + this.statusTracker = new ReplicationStatusTracker(this.zookeeper, abortable); statusTracker.start(); readReplicationStateZnode(); } @@ -214,14 +213,22 @@ try { ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); for (String id : ids) { - peers.put(id, Bytes.toString(ZKUtil.getData(this.zookeeper, - ZKUtil.joinZNode(this.peersZNode, id)))); + byte [] bytes = ZKUtil.getData(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id)); + String clusterKey = null; + try { + clusterKey = parsePeerFrom(bytes); + } catch (DeserializationException de) { + LOG.warn("Failed parse of clusterid=" + id + " znode content, continuing."); + continue; + } + peers.put(id, clusterKey); } } catch (KeeperException e) { this.abortable.abort("Cannot get the list of peers ", e); } return peers; } + /** * Returns all region servers from given peer * @@ -402,14 +409,77 @@ } ZKUtil.createWithParents(this.zookeeper, this.peersZNode); ZKUtil.createAndWatch(this.zookeeper, - ZKUtil.joinZNode(this.peersZNode, id), Bytes.toBytes(clusterKey)); - ZKUtil.createAndWatch(this.zookeeper, getPeerStateNode(id), - Bytes.toBytes(PeerState.ENABLED.name())); // enabled by default + ZKUtil.joinZNode(this.peersZNode, id), toByteArray(clusterKey)); + ZKUtil.createAndWatch(this.zookeeper, getPeerStateNode(id), ENABLED_ZNODE_BYTES); // enabled by default } catch (KeeperException e) { throw new IOException("Unable to add peer", e); } } + /** + * @param clusterKey + * @return Serialized protobuf of clusterKey with pb magic prefix prepended suitable + * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under + * /hbase/repliation/peers/PEER_ID + */ + static byte [] toByteArray(final String clusterKey) { + ZooKeeperProtos.ReplicationPeer.Builder builder = + ZooKeeperProtos.ReplicationPeer.newBuilder(); + builder.setClusterkey(clusterKey); + return ProtobufUtil.prependPBMagic(builder.build().toByteArray()); + } + + /** + * @param state + * @return Serialized protobuf of state with pb magic prefix prepended suitable for + * use as content of either the cluster state znode -- whether or not we should be replicating + * kept in /hbase/replication/state -- or as content of a peer-state znode under a peer cluster id + * as in /hbase/replication/peers/PEER_ID/peer-state. + */ + static byte [] toByteArray(final ZooKeeperProtos.ReplicationState.State state) { + ZooKeeperProtos.ReplicationState.Builder builder = + ZooKeeperProtos.ReplicationState.newBuilder(); + builder.setState(state); + return ProtobufUtil.prependPBMagic(builder.build().toByteArray()); + } + + /** + * @param bytes Content of a peer znode. + * @return ClusterKey parsed from the passed bytes. + * @throws DeserializationException + */ + static String parsePeerFrom(final byte [] bytes) throws DeserializationException { + if (ProtobufUtil.isPBMagicPrefix(bytes)) { + int pblen = ProtobufUtil.lengthOfPBMagic(); + ZooKeeperProtos.ReplicationPeer.Builder builder = + ZooKeeperProtos.ReplicationPeer.newBuilder(); + ZooKeeperProtos.ReplicationPeer peer; + try { + peer = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build(); + } catch (InvalidProtocolBufferException e) { + throw new DeserializationException(e); + } + return peer.getClusterkey(); + } else { + return Bytes.toString(bytes); + } + } + + static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte [] bytes) + throws DeserializationException { + ProtobufUtil.expectPBMagicPrefix(bytes); + int pblen = ProtobufUtil.lengthOfPBMagic(); + ZooKeeperProtos.ReplicationState.Builder builder = + ZooKeeperProtos.ReplicationState.newBuilder(); + ZooKeeperProtos.ReplicationState state; + try { + state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build(); + return state.getState(); + } catch (InvalidProtocolBufferException e) { + throw new DeserializationException(e); + } + } + private boolean peerExists(String id) throws KeeperException { return ZKUtil.checkExists(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id)) >= 0; @@ -423,7 +493,7 @@ * Thrown when the peer doesn't exist */ public void enablePeer(String id) throws IOException { - changePeerState(id, PeerState.ENABLED); + changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED); LOG.info("peer " + id + " is enabled"); } @@ -435,11 +505,12 @@ * Thrown when the peer doesn't exist */ public void disablePeer(String id) throws IOException { - changePeerState(id, PeerState.DISABLED); + changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED); LOG.info("peer " + id + " is disabled"); } - private void changePeerState(String id, PeerState state) throws IOException { + private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state) + throws IOException { try { if (!peerExists(id)) { throw new IllegalArgumentException("peer " + id + " is not registered"); @@ -447,7 +518,8 @@ String peerStateZNode = getPeerStateNode(id); if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) { ZKUtil.setData(this.zookeeper, peerStateZNode, - Bytes.toBytes(state.name())); + state == ZooKeeperProtos.ReplicationState.State.ENABLED? + ENABLED_ZNODE_BYTES: DISABLED_ZNODE_BYTES); } else { ZKUtil.createAndWatch(zookeeper, peerStateZNode, Bytes.toBytes(state.name())); @@ -459,18 +531,6 @@ } /** - * Get state of the peer. This method checks the state by connecting to ZK. - * - * @param id peer's identifier - * @return current state of the peer - */ - public PeerState getPeerState(String id) throws KeeperException { - byte[] peerStateBytes = ZKUtil - .getData(this.zookeeper, getPeerStateNode(id)); - return PeerState.valueOf(Bytes.toString(peerStateBytes)); - } - - /** * Check whether the peer is enabled or not. This method checks the atomic * boolean of ReplicationPeer locally. * @@ -487,8 +547,7 @@ } private String getPeerStateNode(String id) { - return ZKUtil.joinZNode(this.peersZNode, - ZKUtil.joinZNode(id, this.peerStateNodeName)); + return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName)); } /** @@ -516,7 +575,11 @@ setReplicating(true); return true; } - return Boolean.parseBoolean(Bytes.toString(data)); + try { + return isPeerEnabled(data); + } catch (DeserializationException e) { + throw ZKUtil.convert(e); + } } private String getRepStateNode() { @@ -847,6 +910,33 @@ } /** + * Utility method to ensure an ENABLED znode is in place; if not present, we create it. + * @param zookeeper + * @param path Path to znode to check + * @return True if we created the znode. + * @throws NodeExistsException + * @throws KeeperException + */ + static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path) + throws NodeExistsException, KeeperException { + if (ZKUtil.checkExists(zookeeper, path) == -1) { + ZKUtil.createAndWatch(zookeeper, path, ENABLED_ZNODE_BYTES); + return true; + } + return false; + } + + /** + * @param bytes + * @return True if the passed in bytes are those of a pb serialized ENABLED state. + * @throws DeserializationException + */ + static boolean isPeerEnabled(final byte [] bytes) throws DeserializationException { + ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes); + return ZooKeeperProtos.ReplicationState.State.ENABLED == state; + } + + /** * Tracker for status of the replication */ public class ReplicationStatusTracker extends ZooKeeperNodeTracker { @@ -863,4 +953,4 @@ } } } -} +} \ No newline at end of file