diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WAL.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WAL.java new file mode 100644 index 0000000..02f3cbb --- /dev/null +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WAL.java @@ -0,0 +1,931 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: WAL.proto + +package org.apache.hadoop.hbase.protobuf.generated; + +public final class WAL { + private WAL() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + } + public interface CompactionOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required bytes tableName = 1; + boolean hasTableName(); + com.google.protobuf.ByteString getTableName(); + + // required bytes encodedRegionName = 2; + boolean hasEncodedRegionName(); + com.google.protobuf.ByteString getEncodedRegionName(); + + // required bytes familyName = 3; + boolean hasFamilyName(); + com.google.protobuf.ByteString getFamilyName(); + + // repeated string compactionInput = 4; + java.util.List getCompactionInputList(); + int getCompactionInputCount(); + String getCompactionInput(int index); + + // required string compactedFile = 5; + boolean hasCompactedFile(); + String getCompactedFile(); + + // required string storeHomeDir = 6; + boolean hasStoreHomeDir(); + String getStoreHomeDir(); + } + public static final class Compaction extends + com.google.protobuf.GeneratedMessage + implements CompactionOrBuilder { + // Use Compaction.newBuilder() to construct. + private Compaction(Builder builder) { + super(builder); + } + private Compaction(boolean noInit) {} + + private static final Compaction defaultInstance; + public static Compaction getDefaultInstance() { + return defaultInstance; + } + + public Compaction getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.WAL.internal_static_Compaction_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.WAL.internal_static_Compaction_fieldAccessorTable; + } + + private int bitField0_; + // required bytes tableName = 1; + public static final int TABLENAME_FIELD_NUMBER = 1; + private com.google.protobuf.ByteString tableName_; + public boolean hasTableName() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public com.google.protobuf.ByteString getTableName() { + return tableName_; + } + + // required bytes encodedRegionName = 2; + public static final int ENCODEDREGIONNAME_FIELD_NUMBER = 2; + private com.google.protobuf.ByteString encodedRegionName_; + public boolean hasEncodedRegionName() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public com.google.protobuf.ByteString getEncodedRegionName() { + return encodedRegionName_; + } + + // required bytes familyName = 3; + public static final int FAMILYNAME_FIELD_NUMBER = 3; + private com.google.protobuf.ByteString familyName_; + public boolean hasFamilyName() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public com.google.protobuf.ByteString getFamilyName() { + return familyName_; + } + + // repeated string compactionInput = 4; + public static final int COMPACTIONINPUT_FIELD_NUMBER = 4; + private com.google.protobuf.LazyStringList compactionInput_; + public java.util.List + getCompactionInputList() { + return compactionInput_; + } + public int getCompactionInputCount() { + return compactionInput_.size(); + } + public String getCompactionInput(int index) { + return compactionInput_.get(index); + } + + // required string compactedFile = 5; + public static final int COMPACTEDFILE_FIELD_NUMBER = 5; + private java.lang.Object compactedFile_; + public boolean hasCompactedFile() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public String getCompactedFile() { + java.lang.Object ref = compactedFile_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (com.google.protobuf.Internal.isValidUtf8(bs)) { + compactedFile_ = s; + } + return s; + } + } + private com.google.protobuf.ByteString getCompactedFileBytes() { + java.lang.Object ref = compactedFile_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((String) ref); + compactedFile_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // required string storeHomeDir = 6; + public static final int STOREHOMEDIR_FIELD_NUMBER = 6; + private java.lang.Object storeHomeDir_; + public boolean hasStoreHomeDir() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + public String getStoreHomeDir() { + java.lang.Object ref = storeHomeDir_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (com.google.protobuf.Internal.isValidUtf8(bs)) { + storeHomeDir_ = s; + } + return s; + } + } + private com.google.protobuf.ByteString getStoreHomeDirBytes() { + java.lang.Object ref = storeHomeDir_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((String) ref); + storeHomeDir_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + private void initFields() { + tableName_ = com.google.protobuf.ByteString.EMPTY; + encodedRegionName_ = com.google.protobuf.ByteString.EMPTY; + familyName_ = com.google.protobuf.ByteString.EMPTY; + compactionInput_ = com.google.protobuf.LazyStringArrayList.EMPTY; + compactedFile_ = ""; + storeHomeDir_ = ""; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasTableName()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasEncodedRegionName()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasFamilyName()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasCompactedFile()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasStoreHomeDir()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, tableName_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBytes(2, encodedRegionName_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeBytes(3, familyName_); + } + for (int i = 0; i < compactionInput_.size(); i++) { + output.writeBytes(4, compactionInput_.getByteString(i)); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeBytes(5, getCompactedFileBytes()); + } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeBytes(6, getStoreHomeDirBytes()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(1, tableName_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(2, encodedRegionName_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(3, familyName_); + } + { + int dataSize = 0; + for (int i = 0; i < compactionInput_.size(); i++) { + dataSize += com.google.protobuf.CodedOutputStream + .computeBytesSizeNoTag(compactionInput_.getByteString(i)); + } + size += dataSize; + size += 1 * getCompactionInputList().size(); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(5, getCompactedFileBytes()); + } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(6, getStoreHomeDirBytes()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction other = (org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction) obj; + + boolean result = true; + result = result && (hasTableName() == other.hasTableName()); + if (hasTableName()) { + result = result && getTableName() + .equals(other.getTableName()); + } + result = result && (hasEncodedRegionName() == other.hasEncodedRegionName()); + if (hasEncodedRegionName()) { + result = result && getEncodedRegionName() + .equals(other.getEncodedRegionName()); + } + result = result && (hasFamilyName() == other.hasFamilyName()); + if (hasFamilyName()) { + result = result && getFamilyName() + .equals(other.getFamilyName()); + } + result = result && getCompactionInputList() + .equals(other.getCompactionInputList()); + result = result && (hasCompactedFile() == other.hasCompactedFile()); + if (hasCompactedFile()) { + result = result && getCompactedFile() + .equals(other.getCompactedFile()); + } + result = result && (hasStoreHomeDir() == other.hasStoreHomeDir()); + if (hasStoreHomeDir()) { + result = result && getStoreHomeDir() + .equals(other.getStoreHomeDir()); + } + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + @java.lang.Override + public int hashCode() { + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasTableName()) { + hash = (37 * hash) + TABLENAME_FIELD_NUMBER; + hash = (53 * hash) + getTableName().hashCode(); + } + if (hasEncodedRegionName()) { + hash = (37 * hash) + ENCODEDREGIONNAME_FIELD_NUMBER; + hash = (53 * hash) + getEncodedRegionName().hashCode(); + } + if (hasFamilyName()) { + hash = (37 * hash) + FAMILYNAME_FIELD_NUMBER; + hash = (53 * hash) + getFamilyName().hashCode(); + } + if (getCompactionInputCount() > 0) { + hash = (37 * hash) + COMPACTIONINPUT_FIELD_NUMBER; + hash = (53 * hash) + getCompactionInputList().hashCode(); + } + if (hasCompactedFile()) { + hash = (37 * hash) + COMPACTEDFILE_FIELD_NUMBER; + hash = (53 * hash) + getCompactedFile().hashCode(); + } + if (hasStoreHomeDir()) { + hash = (37 * hash) + STOREHOMEDIR_FIELD_NUMBER; + hash = (53 * hash) + getStoreHomeDir().hashCode(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + return hash; + } + + public static org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.WAL.internal_static_Compaction_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.WAL.internal_static_Compaction_fieldAccessorTable; + } + + // Construct using org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder(BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + tableName_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000001); + encodedRegionName_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000002); + familyName_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000004); + compactionInput_ = com.google.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000008); + compactedFile_ = ""; + bitField0_ = (bitField0_ & ~0x00000010); + storeHomeDir_ = ""; + bitField0_ = (bitField0_ & ~0x00000020); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction.getDescriptor(); + } + + public org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction getDefaultInstanceForType() { + return org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction build() { + org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction buildPartial() { + org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction result = new org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.tableName_ = tableName_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.encodedRegionName_ = encodedRegionName_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.familyName_ = familyName_; + if (((bitField0_ & 0x00000008) == 0x00000008)) { + compactionInput_ = new com.google.protobuf.UnmodifiableLazyStringList( + compactionInput_); + bitField0_ = (bitField0_ & ~0x00000008); + } + result.compactionInput_ = compactionInput_; + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000008; + } + result.compactedFile_ = compactedFile_; + if (((from_bitField0_ & 0x00000020) == 0x00000020)) { + to_bitField0_ |= 0x00000010; + } + result.storeHomeDir_ = storeHomeDir_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction) { + return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction other) { + if (other == org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction.getDefaultInstance()) return this; + if (other.hasTableName()) { + setTableName(other.getTableName()); + } + if (other.hasEncodedRegionName()) { + setEncodedRegionName(other.getEncodedRegionName()); + } + if (other.hasFamilyName()) { + setFamilyName(other.getFamilyName()); + } + if (!other.compactionInput_.isEmpty()) { + if (compactionInput_.isEmpty()) { + compactionInput_ = other.compactionInput_; + bitField0_ = (bitField0_ & ~0x00000008); + } else { + ensureCompactionInputIsMutable(); + compactionInput_.addAll(other.compactionInput_); + } + onChanged(); + } + if (other.hasCompactedFile()) { + setCompactedFile(other.getCompactedFile()); + } + if (other.hasStoreHomeDir()) { + setStoreHomeDir(other.getStoreHomeDir()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasTableName()) { + + return false; + } + if (!hasEncodedRegionName()) { + + return false; + } + if (!hasFamilyName()) { + + return false; + } + if (!hasCompactedFile()) { + + return false; + } + if (!hasStoreHomeDir()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + tableName_ = input.readBytes(); + break; + } + case 18: { + bitField0_ |= 0x00000002; + encodedRegionName_ = input.readBytes(); + break; + } + case 26: { + bitField0_ |= 0x00000004; + familyName_ = input.readBytes(); + break; + } + case 34: { + ensureCompactionInputIsMutable(); + compactionInput_.add(input.readBytes()); + break; + } + case 42: { + bitField0_ |= 0x00000010; + compactedFile_ = input.readBytes(); + break; + } + case 50: { + bitField0_ |= 0x00000020; + storeHomeDir_ = input.readBytes(); + break; + } + } + } + } + + private int bitField0_; + + // required bytes tableName = 1; + private com.google.protobuf.ByteString tableName_ = com.google.protobuf.ByteString.EMPTY; + public boolean hasTableName() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public com.google.protobuf.ByteString getTableName() { + return tableName_; + } + public Builder setTableName(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + tableName_ = value; + onChanged(); + return this; + } + public Builder clearTableName() { + bitField0_ = (bitField0_ & ~0x00000001); + tableName_ = getDefaultInstance().getTableName(); + onChanged(); + return this; + } + + // required bytes encodedRegionName = 2; + private com.google.protobuf.ByteString encodedRegionName_ = com.google.protobuf.ByteString.EMPTY; + public boolean hasEncodedRegionName() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public com.google.protobuf.ByteString getEncodedRegionName() { + return encodedRegionName_; + } + public Builder setEncodedRegionName(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + encodedRegionName_ = value; + onChanged(); + return this; + } + public Builder clearEncodedRegionName() { + bitField0_ = (bitField0_ & ~0x00000002); + encodedRegionName_ = getDefaultInstance().getEncodedRegionName(); + onChanged(); + return this; + } + + // required bytes familyName = 3; + private com.google.protobuf.ByteString familyName_ = com.google.protobuf.ByteString.EMPTY; + public boolean hasFamilyName() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public com.google.protobuf.ByteString getFamilyName() { + return familyName_; + } + public Builder setFamilyName(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + familyName_ = value; + onChanged(); + return this; + } + public Builder clearFamilyName() { + bitField0_ = (bitField0_ & ~0x00000004); + familyName_ = getDefaultInstance().getFamilyName(); + onChanged(); + return this; + } + + // repeated string compactionInput = 4; + private com.google.protobuf.LazyStringList compactionInput_ = com.google.protobuf.LazyStringArrayList.EMPTY; + private void ensureCompactionInputIsMutable() { + if (!((bitField0_ & 0x00000008) == 0x00000008)) { + compactionInput_ = new com.google.protobuf.LazyStringArrayList(compactionInput_); + bitField0_ |= 0x00000008; + } + } + public java.util.List + getCompactionInputList() { + return java.util.Collections.unmodifiableList(compactionInput_); + } + public int getCompactionInputCount() { + return compactionInput_.size(); + } + public String getCompactionInput(int index) { + return compactionInput_.get(index); + } + public Builder setCompactionInput( + int index, String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureCompactionInputIsMutable(); + compactionInput_.set(index, value); + onChanged(); + return this; + } + public Builder addCompactionInput(String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureCompactionInputIsMutable(); + compactionInput_.add(value); + onChanged(); + return this; + } + public Builder addAllCompactionInput( + java.lang.Iterable values) { + ensureCompactionInputIsMutable(); + super.addAll(values, compactionInput_); + onChanged(); + return this; + } + public Builder clearCompactionInput() { + compactionInput_ = com.google.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000008); + onChanged(); + return this; + } + void addCompactionInput(com.google.protobuf.ByteString value) { + ensureCompactionInputIsMutable(); + compactionInput_.add(value); + onChanged(); + } + + // required string compactedFile = 5; + private java.lang.Object compactedFile_ = ""; + public boolean hasCompactedFile() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + public String getCompactedFile() { + java.lang.Object ref = compactedFile_; + if (!(ref instanceof String)) { + String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); + compactedFile_ = s; + return s; + } else { + return (String) ref; + } + } + public Builder setCompactedFile(String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000010; + compactedFile_ = value; + onChanged(); + return this; + } + public Builder clearCompactedFile() { + bitField0_ = (bitField0_ & ~0x00000010); + compactedFile_ = getDefaultInstance().getCompactedFile(); + onChanged(); + return this; + } + void setCompactedFile(com.google.protobuf.ByteString value) { + bitField0_ |= 0x00000010; + compactedFile_ = value; + onChanged(); + } + + // required string storeHomeDir = 6; + private java.lang.Object storeHomeDir_ = ""; + public boolean hasStoreHomeDir() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + public String getStoreHomeDir() { + java.lang.Object ref = storeHomeDir_; + if (!(ref instanceof String)) { + String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); + storeHomeDir_ = s; + return s; + } else { + return (String) ref; + } + } + public Builder setStoreHomeDir(String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000020; + storeHomeDir_ = value; + onChanged(); + return this; + } + public Builder clearStoreHomeDir() { + bitField0_ = (bitField0_ & ~0x00000020); + storeHomeDir_ = getDefaultInstance().getStoreHomeDir(); + onChanged(); + return this; + } + void setStoreHomeDir(com.google.protobuf.ByteString value) { + bitField0_ |= 0x00000020; + storeHomeDir_ = value; + onChanged(); + } + + // @@protoc_insertion_point(builder_scope:Compaction) + } + + static { + defaultInstance = new Compaction(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:Compaction) + } + + private static com.google.protobuf.Descriptors.Descriptor + internal_static_Compaction_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_Compaction_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\tWAL.proto\032\013hbase.proto\"\224\001\n\nCompaction\022" + + "\021\n\ttableName\030\001 \002(\014\022\031\n\021encodedRegionName\030" + + "\002 \002(\014\022\022\n\nfamilyName\030\003 \002(\014\022\027\n\017compactionI" + + "nput\030\004 \003(\t\022\025\n\rcompactedFile\030\005 \002(\t\022\024\n\014sto" + + "reHomeDir\030\006 \002(\tB6\n*org.apache.hadoop.hba" + + "se.protobuf.generatedB\003WALH\001\240\001\001" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_Compaction_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_Compaction_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_Compaction_descriptor, + new java.lang.String[] { "TableName", "EncodedRegionName", "FamilyName", "CompactionInput", "CompactedFile", "StoreHomeDir", }, + org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction.class, + org.apache.hadoop.hbase.protobuf.generated.WAL.Compaction.Builder.class); + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.getDescriptor(), + }, assigner); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto new file mode 100644 index 0000000..1932fce --- /dev/null +++ b/hbase-protocol/src/main/protobuf/WAL.proto @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +option java_package = "org.apache.hadoop.hbase.protobuf.generated"; +option java_outer_classname = "WAL"; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +import "hbase.proto"; + +/** + * WAL entries + */ + +/** + * Special WAL entry to hold all related to a compaction. + * Written to WAL before completing compaction. There is + * sufficient info in the below message to complete later + * the * compaction should we fail the WAL write. + */ +message Compaction { + required bytes tableName = 1; + required bytes encodedRegionName = 2; + required bytes familyName = 3; + repeated string compactionInput = 4; + required string compactedFile = 5; + required string storeHomeDir = 6; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 976c098..a69e647 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -35,9 +35,7 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; -import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; @@ -83,7 +81,7 @@ public class WALPlayer extends Configured implements Tool { // skip all other tables if (Bytes.equals(table, key.getTablename())) { for (KeyValue kv : value.getKeyValues()) { - if (HLogUtil.isMetaFamily(kv.getFamily())) continue; + if (WALEdit.isMetaEditFamily(kv.getFamily())) continue; context.write(new ImmutableBytesWritable(kv.getRow()), kv); } } @@ -127,7 +125,7 @@ public class WALPlayer extends Configured implements Tool { KeyValue lastKV = null; for (KeyValue kv : value.getKeyValues()) { // filtering HLog meta entries, see HLog.completeCacheFlushLogEdit - if (HLogUtil.isMetaFamily(kv.getFamily())) continue; + if (WALEdit.isMetaEditFamily(kv.getFamily())) continue; // A WALEdit may contain multiple operations (HBASE-3584) and/or // multiple rows (HBASE-5229). diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compaction.java new file mode 100644 index 0000000..7ecb987 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compaction.java @@ -0,0 +1,135 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.protobuf.generated.WAL; +import org.apache.hadoop.hbase.util.Bytes; + +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; + +/** + * A Compaction description. Has all inputs and the output file. Created after we have finished writing the output + * compacted file. There is enough information here to 'complete' a compaction that failed after we've written the + * output file but before all old files have been deleted and the compacted result has been moved into place. + */ +public class Compaction { + private final byte [] tableName; + private final byte [] encodedRegionName; + private final byte [] familyName; + private final List compactionInput; + private final Path compactedFile; + private final Path storeHomeDir; + + /** + * @param tableName + * @param encodedRegionName Encoded name of the region as returned by + * HRegionInfo#getEncodedNameAsBytes() + * @param familyName + * @param compactionInput + * @param compactedFile + * @param storeHomeDir + */ + public Compaction(final byte [] tableName, final byte [] encodedRegionName, byte[] familyName, + List compactionInput, Path compactedFile, + Path storeHomeDir) { + this.tableName = cannotBeNullOrEmpty(tableName); + this.encodedRegionName = cannotBeNullOrEmpty(encodedRegionName); + this.familyName = cannotBeNullOrEmpty(familyName); + this.compactionInput = new ArrayList(compactionInput); + if (compactedFile == null) throw new NullPointerException(); + this.compactedFile = compactedFile; + if (storeHomeDir == null) throw new NullPointerException(); + this.storeHomeDir = storeHomeDir; + } + + private static byte [] cannotBeNullOrEmpty(final byte [] b) { + if (b == null) throw new NullPointerException(); + if (b.length <= 0) throw new IllegalArgumentException("Cannot be empty"); + return b; + } + + public List getCompactionInput() { + return compactionInput; + } + + public Path getCompactedFile() { + return compactedFile; + } + + public Path getStoreHomeDir() { + return storeHomeDir; + } + + public byte [] getTableName() { + return this.tableName; + } + + public byte [] getFamilyName() { + return this.familyName; + } + + public byte [] getEncodedRegionName() { + return this.encodedRegionName; + } + + public String toString() { + return "region=" + Bytes.toString(this.encodedRegionName) + ", family=" + Bytes.toString(familyName); + } + + /** + * @return This datastructure serialized as a pb message. + * @see #parseFrom(byte[]) + */ + public byte [] toByteArray() { + WAL.Compaction.Builder builder = WAL.Compaction.newBuilder(); + builder.setTableName(ByteString.copyFrom(this.tableName)); + builder.setEncodedRegionName(ByteString.copyFrom(this.encodedRegionName)); + builder.setFamilyName(ByteString.copyFrom(this.familyName)); + builder.setCompactedFile(this.compactedFile.toString()); + builder.setStoreHomeDir(this.storeHomeDir.toString()); + for (Path p: this.compactionInput) { + builder.addCompactionInput(p.toString()); + } + return builder.build().toByteArray(); + } + + /** + * @param pb + * @return An instance of this class made by deserializing the passed pb bytes as a pb messa + * @throws InvalidProtocolBufferException + */ + static Compaction parseFrom(final byte [] pb) throws InvalidProtocolBufferException { + WAL.Compaction pbCompaction = WAL.Compaction.parseFrom(pb); + int count = pbCompaction.getCompactionInputCount(); + List compactionInput = new ArrayList(count); + for (int i = 0; i < count; i++) compactionInput.add(new Path(pbCompaction.getCompactionInput(i))); + return new Compaction(pbCompaction.getTableName().toByteArray(), + pbCompaction.getEncodedRegionName().toByteArray(), + pbCompaction.getFamilyName().toByteArray(), + compactionInput, + new Path(pbCompaction.getCompactedFile()), + new Path(pbCompaction.getStoreHomeDir())); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java index d6814ac..d65cc1d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java @@ -57,4 +57,4 @@ public interface CompactionRequestor { public void requestCompaction(final HRegion r, final Store s, final String why, int pri) throws IOException; -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 412abf6..bd6cb7c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1204,7 +1204,7 @@ public class HRegion implements HeapSize { // , Writable{ * Do preparation for pending compaction. * @throws IOException */ - void doRegionCompactionPrep() throws IOException { + protected void doRegionCompactionPrep() throws IOException { } /* @@ -2056,12 +2056,7 @@ public class HRegion implements HeapSize { // , Writable{ boolean deletesCfSetConsistent = true; //The set of columnFamilies first seen for Delete. Set deletesCfSet = null; - WALEdit walEdit = new WALEdit(); - - long startTimeMs = EnvironmentEdgeManager.currentTimeMillis(); - - MultiVersionConsistencyControl.WriteEntry w = null; long txid = 0; boolean walSyncSuccessful = false; @@ -2330,7 +2325,6 @@ public class HRegion implements HeapSize { // , Writable{ if (noOfPuts > 0) { // There were some Puts in the batch. - double noOfMutations = noOfPuts + noOfDeletes; this.metricsRegion.updatePut(); } if (noOfDeletes > 0) { @@ -2893,7 +2887,7 @@ public class HRegion implements HeapSize { // , Writable{ for (KeyValue kv: val.getKeyValues()) { // Check this edit is for me. Also, guard against writing the special // METACOLUMN info such as HBASE::CACHEFLUSH entries - if (kv.matchingFamily(HLog.METAFAMILY) || + if (kv.matchingFamily(WALEdit.METAFAMILY) || !Bytes.equals(key.getEncodedRegionName(), this.regionInfo.getEncodedNameAsBytes())) { skippedEdits++; continue; @@ -3536,10 +3530,6 @@ public class HRegion implements HeapSize { // , Writable{ } } - private boolean filterRow() { - return filter != null - && filter.filterRow(); - } private boolean filterRowKey(byte[] row) { return filter != null && filter.filterRowKey(row, 0, row.length); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index e135f8a..24aa431 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -128,7 +128,19 @@ public class HStore implements Store { private final int blockingStoreFileCount; private volatile long storeSize = 0L; private volatile long totalUncompressedBytes = 0L; + /** + * Lock that allows only one flush to be ongoing at a time. + */ private final Object flushLock = new Object(); + /** + * RWLock for store operations. + * Locked in shared mode when the list of component stores is looked at: + * - all reads/writes to table data + * - checking for split + * Locked in exclusive mode when the list of component stores is modified: + * - closing + * - completing a compaction + */ final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final boolean verifyBulkLoads; @@ -417,10 +429,7 @@ public class HStore implements Store { // open each store file in parallel completionService.submit(new Callable() { public StoreFile call() throws IOException { - StoreFile storeFile = new StoreFile(fs, p, conf, cacheConf, - family.getBloomFilterType(), dataBlockEncoder); - storeFile.createReader(); - return storeFile; + return createStoreFileAndReader(p); } }); totalValidStoreFile++; @@ -450,6 +459,18 @@ public class HStore implements Store { return results; } + private StoreFile createStoreFileAndReader(final Path p) throws IOException { + return createStoreFileAndReader(p, this.dataBlockEncoder); + } + + private StoreFile createStoreFileAndReader(final Path p, final HFileDataBlockEncoder encoder) throws IOException { + StoreFile storeFile = new StoreFile(fs, p, conf, cacheConf, family.getBloomFilterType(), encoder); + storeFile.createReader(); + return storeFile; + } + + + @Override public long add(final KeyValue kv) { lock.readLock().lock(); @@ -1020,10 +1041,7 @@ public class HStore implements Store { region.getCoprocessorHost().postCompact(this, sf); } } else { - // Create storefile around what we wrote with a reader on it. - sf = new StoreFile(this.fs, writer.getPath(), this.conf, this.cacheConf, - this.family.getBloomFilterType(), this.dataBlockEncoder); - sf.createReader(); + sf = createStoreFileAndReader(writer.getPath()); } } finally { synchronized (filesCompacting) { @@ -1043,6 +1061,44 @@ public class HStore implements Store { return sf; } + + private static Path moveCompactedFileIntoHomeDir(FileSystem fs, Path srcPath, Path homeDir) + throws IOException { + Path dstPath = StoreFile.getRandomFilename(fs, homeDir); + LOG.info("Renaming compacted file at " + srcPath + " to " + dstPath); + try { + StoreFile.rename(fs, srcPath, dstPath); + } catch (IOException e) { + LOG.error("Failed move of compacted file " + srcPath + " to " + dstPath, e); + throw e; + } + return dstPath; + } + + /** + * Call to complete a compaction. + * Currently unused. Its for the case where we find in the WAL a compaction that was not finished. We could find + * one recovering a WAL after a regionserver crash. See HBASE-2331. + * @param fs + * @param compaction + */ + public static void completeCompactionMarker(FileSystem fs, Compaction compaction) + throws IOException { + List inputPaths = compaction.getCompactionInput(); + Path compactedFile = compaction.getCompactedFile(); + Path homedir = compaction.getStoreHomeDir(); + if (compactedFile != null && fs.exists(compactedFile)) { + // We didn't finish moving it + LOG.info("Moving compacted file " + compactedFile + + " into store directory " + homedir); + moveCompactedFileIntoHomeDir(fs, compactedFile, homedir); + } + for (Path p : inputPaths) { + LOG.info("Removing already-compacted file " + p); + fs.delete(p, true); + } + } + @Override public void compactRecentForTesting(int N) throws IOException { List filesToCompact; @@ -1510,13 +1566,9 @@ public class HStore implements Store { throws IOException { StoreFile storeFile = null; try { - storeFile = new StoreFile(this.fs, path, this.conf, - this.cacheConf, this.family.getBloomFilterType(), - NoOpDataBlockEncoder.INSTANCE); - storeFile.createReader(); + storeFile = createStoreFileAndReader(path, NoOpDataBlockEncoder.INSTANCE); } catch (IOException e) { - LOG.error("Failed to open store file : " + path - + ", keeping it in tmp location", e); + LOG.error("Failed to open store file : " + path + ", keeping it in tmp location", e); throw e; } finally { if (storeFile != null) { @@ -1544,9 +1596,17 @@ public class HStore implements Store { * @return StoreFile created. May be null. * @throws IOException */ - StoreFile completeCompaction(final Collection compactedFiles, - final StoreFile.Writer compactedFile) - throws IOException { + StoreFile completeCompaction(final Collection compactedFiles, final StoreFile.Writer compactedFile) + throws IOException { + List inputPaths = new ArrayList(); + for (StoreFile f : compactedFiles) { + inputPaths.add(f.getPath()); + } + Path outputPath = (compactedFile != null)? compactedFile.getPath() : null; + Compaction compaction = new Compaction(this.getHRegionInfo().getTableName(), + this.getHRegionInfo().getEncodedNameAsBytes(), getFamily().getName(), inputPaths, outputPath, + getHomedir()); + region.getLog().writeCompactionMarker(compaction); // 1. Moving the new files into place -- if there is a new file (may not // be if all cells were expired or deleted). StoreFile result = null; @@ -1554,17 +1614,8 @@ public class HStore implements Store { validateStoreFile(compactedFile.getPath()); // Move the file into the right spot Path origPath = compactedFile.getPath(); - Path destPath = new Path(homedir, origPath.getName()); - LOG.info("Renaming compacted file at " + origPath + " to " + destPath); - if (!fs.rename(origPath, destPath)) { - LOG.error("Failed move of compacted file " + origPath + " to " + - destPath); - throw new IOException("Failed move of compacted file " + origPath + - " to " + destPath); - } - result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf, - this.family.getBloomFilterType(), this.dataBlockEncoder); - result.createReader(); + Path dstPath = moveCompactedFileIntoHomeDir(fs, origPath, homedir); + result = createStoreFileAndReader(dstPath); } try { this.lock.writeLock().lock(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 781abe5..c4986d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.regionserver.Compaction; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -105,7 +106,7 @@ import org.apache.hadoop.util.StringUtils; @InterfaceAudience.Private class FSHLog implements HLog, Syncable { static final Log LOG = LogFactory.getLog(FSHLog.class); - + private final FileSystem fs; private final Path rootDir; private final Path dir; @@ -121,12 +122,11 @@ class FSHLog implements HLog, Syncable { private long lastDeferredTxid; private final Path oldLogDir; private volatile boolean logRollRunning; - private boolean failIfLogDirExists; private WALCoprocessorHost coprocessorHost; private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current SequenceFile.writer - // Minimum tolerable replicas, if the actual value is lower than it, + // Minimum tolerable replicas, if the actual value is lower than it, // rollWriter will be triggered private int minTolerableReplication; private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas @@ -218,10 +218,10 @@ class FSHLog implements HLog, Syncable { public FSHLog(final FileSystem fs, final Path root, final String logName, final Configuration conf) throws IOException { - this(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME, + this(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null); } - + /** * Constructor. * @@ -235,7 +235,7 @@ class FSHLog implements HLog, Syncable { public FSHLog(final FileSystem fs, final Path root, final String logName, final String oldLogName, final Configuration conf) throws IOException { - this(fs, root, logName, oldLogName, + this(fs, root, logName, oldLogName, conf, null, true, null); } @@ -262,7 +262,7 @@ class FSHLog implements HLog, Syncable { public FSHLog(final FileSystem fs, final Path root, final String logName, final Configuration conf, final List listeners, final String prefix) throws IOException { - this(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME, + this(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, prefix); } @@ -287,7 +287,7 @@ class FSHLog implements HLog, Syncable { * @throws IOException */ private FSHLog(final FileSystem fs, final Path root, final String logName, - final String oldLogName, final Configuration conf, + final String oldLogName, final Configuration conf, final List listeners, final boolean failIfLogDirExists, final String prefix) throws IOException { @@ -297,15 +297,13 @@ class FSHLog implements HLog, Syncable { this.dir = new Path(this.rootDir, logName); this.oldLogDir = new Path(this.rootDir, oldLogName); this.conf = conf; - + if (listeners != null) { for (WALActionsListener i: listeners) { registerWALActionsListener(i); } } - - this.failIfLogDirExists = failIfLogDirExists; - + this.blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize", getDefaultBlockSize()); // Roll at 95% of block size. @@ -313,7 +311,7 @@ class FSHLog implements HLog, Syncable { this.logrollsize = (long)(this.blocksize * multi); this.optionalFlushInterval = conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000); - + this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32); this.minTolerableReplication = conf.getInt( "hbase.regionserver.hlog.tolerable.lowreplication", @@ -323,9 +321,9 @@ class FSHLog implements HLog, Syncable { this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true); this.closeErrorsTolerated = conf.getInt( "hbase.regionserver.logroll.errors.tolerated", 0); - + this.logSyncerThread = new LogSyncer(this.optionalFlushInterval); - + LOG.info("HLog configuration: blocksize=" + StringUtils.byteDesc(this.blocksize) + ", rollsize=" + StringUtils.byteDesc(this.logrollsize) + @@ -334,7 +332,7 @@ class FSHLog implements HLog, Syncable { // If prefix is null||empty then just name it hlog this.prefix = prefix == null || prefix.isEmpty() ? "hlog" : URLEncoder.encode(prefix, "UTF8"); - + if (failIfLogDirExists && this.fs.exists(dir)) { throw new IOException("Target HLog directory already exists: " + dir); } @@ -349,7 +347,7 @@ class FSHLog implements HLog, Syncable { } // rollWriter sets this.hdfs_out if it can. rollWriter(); - + // handle the reflection necessary to call getNumCurrentReplicas() this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out); @@ -359,7 +357,7 @@ class FSHLog implements HLog, Syncable { this.metrics = new MetricsWAL(); } - + // use reflection to search for getDefaultBlockSize(Path f) // if the method doesn't exist, fall back to using getDefaultBlockSize() private long getDefaultBlockSize() throws IOException { @@ -452,7 +450,7 @@ class FSHLog implements HLog, Syncable { * @return The wrapped stream our writer is using; its not the * writer's 'out' FSDatoOutputStream but the stream that this 'out' wraps * (In hdfs its an instance of DFSDataOutputStream). - * + * * usage: see TestLogRolling.java */ OutputStream getOutputStream() { @@ -553,7 +551,7 @@ class FSHLog implements HLog, Syncable { /** * This method allows subclasses to inject different writers without having to * extend other methods like rollWriter(). - * + * * @param fs * @param path * @param conf @@ -738,28 +736,30 @@ class FSHLog implements HLog, Syncable { close(); if (!fs.exists(this.dir)) return; FileStatus[] files = fs.listStatus(this.dir); - for(FileStatus file : files) { - - Path p = getHLogArchivePath(this.oldLogDir, file.getPath()); - // Tell our listeners that a log is going to be archived. - if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.preLogArchive(file.getPath(), p); + if (files != null) { + for(FileStatus file : files) { + + Path p = getHLogArchivePath(this.oldLogDir, file.getPath()); + // Tell our listeners that a log is going to be archived. + if (!this.listeners.isEmpty()) { + for (WALActionsListener i : this.listeners) { + i.preLogArchive(file.getPath(), p); + } } - } - if (!fs.rename(file.getPath(),p)) { - throw new IOException("Unable to rename " + file.getPath() + " to " + p); - } - // Tell our listeners that a log was archived. - if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.postLogArchive(file.getPath(), p); + if (!fs.rename(file.getPath(),p)) { + throw new IOException("Unable to rename " + file.getPath() + " to " + p); + } + // Tell our listeners that a log was archived. + if (!this.listeners.isEmpty()) { + for (WALActionsListener i : this.listeners) { + i.postLogArchive(file.getPath(), p); + } } } + LOG.debug("Moved " + files.length + " log files to " + + FSUtils.getPath(this.oldLogDir)); } - LOG.debug("Moved " + files.length + " log files to " + - FSUtils.getPath(this.oldLogDir)); if (!fs.delete(dir, true)) { LOG.info("Unable to delete " + dir); } @@ -799,14 +799,15 @@ class FSHLog implements HLog, Syncable { /** * @param now - * @param regionName + * @param encodedRegionName Encoded name of the region as returned by + * HRegionInfo#getEncodedNameAsBytes(). * @param tableName * @param clusterId * @return New log key. */ - protected HLogKey makeKey(byte[] regionName, byte[] tableName, long seqnum, + protected HLogKey makeKey(byte[] encodedRegionName, byte[] tableName, long seqnum, long now, UUID clusterId) { - return new HLogKey(regionName, tableName, seqnum, now, clusterId); + return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterId); } @Override @@ -908,7 +909,7 @@ class FSHLog implements HLog, Syncable { } // Sync if catalog region, and if not then check if that table supports // deferred log flushing - if (doSync && + if (doSync && (info.isMetaRegion() || !htd.isDeferredLogFlush())) { // sync txn to file system @@ -918,14 +919,14 @@ class FSHLog implements HLog, Syncable { } @Override - public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits, + public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, final long now, HTableDescriptor htd) throws IOException { return append(info, tableName, edits, clusterId, now, htd, false); } @Override - public long append(HRegionInfo info, byte [] tableName, WALEdit edits, + public long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, final long now, HTableDescriptor htd) throws IOException { return append(info, tableName, edits, clusterId, now, htd, true); @@ -947,8 +948,8 @@ class FSHLog implements HLog, Syncable { // List of pending writes to the HLog. There corresponds to transactions // that have not yet returned to the client. We keep them cached here - // instead of writing them to HDFS piecemeal, because the HDFS write - // method is pretty heavyweight as far as locking is concerned. The + // instead of writing them to HDFS piecemeal, because the HDFS write + // method is pretty heavyweight as far as locking is concerned. The // goal is to increase the batchsize for writing-to-hdfs as well as // sync-to-hdfs, so that we can get better system throughput. private List pendingWrites = new LinkedList(); @@ -1031,7 +1032,7 @@ class FSHLog implements HLog, Syncable { if (this.closed) return; tempWriter = this.writer; // guaranteed non-null } - // if the transaction that we are interested in is already + // if the transaction that we are interested in is already // synced, then return immediately. if (txid <= this.syncedTillHere) { return; @@ -1039,7 +1040,7 @@ class FSHLog implements HLog, Syncable { try { long doneUpto; long now = EnvironmentEdgeManager.currentTimeMillis(); - // First flush all the pending writes to HDFS. Then + // First flush all the pending writes to HDFS. Then // issue the sync to HDFS. If sync is successful, then update // syncedTillHere to indicate that transactions till this // number has been successfully synced. @@ -1189,6 +1190,7 @@ class FSHLog implements HLog, Syncable { } } + // TODO: Remove info. Unused. protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit, HTableDescriptor htd) throws IOException { @@ -1273,6 +1275,22 @@ class FSHLog implements HLog, Syncable { return obtainSeqNum(); } + /** + * Write the marker that a compaction has succeeded and is about to be committed. + * This provides info to the HMaster to allow it to recover the compaction if + * this regionserver dies in the middle (This part is not yet implemented). It also prevents the compaction from + * finishing if this regionserver has already lost its lease on the log. + */ + public void writeCompactionMarker(final Compaction c) + throws IOException { + WALEdit e = WALEdit.createCompaction(c); + HLogKey key = makeKey(c.getEncodedRegionName(), c.getTableName(), obtainSeqNum(), System.currentTimeMillis(), + HConstants.DEFAULT_CLUSTER_ID); + logSyncerThread.append(new Entry(key, e)); + sync(); + LOG.info("Appended compaction marker " + c); + } + @Override public void completeCacheFlush(final byte [] encodedRegionName, final byte [] tableName, final long logSeqId, final boolean isMetaRegion) @@ -1284,7 +1302,7 @@ class FSHLog implements HLog, Syncable { long txid = 0; synchronized (updateLock) { long now = EnvironmentEdgeManager.currentTimeMillis(); - WALEdit edit = completeCacheFlushLogEdit(); + WALEdit edit = WALEdit.createFlush(); HLogKey key = makeKey(encodedRegionName, tableName, logSeqId, System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); logSyncerThread.append(new Entry(key, edit)); @@ -1309,14 +1327,6 @@ class FSHLog implements HLog, Syncable { } } - private WALEdit completeCacheFlushLogEdit() { - KeyValue kv = new KeyValue(HLog.METAROW, HLog.METAFAMILY, null, - System.currentTimeMillis(), HLogUtil.COMPLETE_CACHE_FLUSH); - WALEdit e = new WALEdit(); - e.add(kv); - return e; - } - @Override public void abortCacheFlush(byte[] encodedRegionName) { Long snapshot_seq = @@ -1347,13 +1357,13 @@ class FSHLog implements HLog, Syncable { /** * Get the directory we are making logs in. - * + * * @return dir */ protected Path getDir() { return dir; } - + static Path getHLogArchivePath(Path oldLogDir, Path p) { return new Path(oldLogDir, p.getName()); } @@ -1391,7 +1401,7 @@ class FSHLog implements HLog, Syncable { conf, baseDir, p, oldLogDir, fs); logSplitter.splitLog(); } - + @Override public WALCoprocessorHost getCoprocessorHost() { return coprocessorHost; @@ -1435,4 +1445,4 @@ class FSHLog implements HLog, Syncable { System.exit(-1); } } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 763b8d4..5e752c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -22,29 +22,25 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.io.OutputStream; -import java.util.NavigableSet; import java.util.UUID; import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.regionserver.Compaction; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.Writable; @InterfaceAudience.Private public interface HLog { public static final Log LOG = LogFactory.getLog(HLog.class); - - public static final byte[] METAFAMILY = Bytes.toBytes("METAFAMILY"); static final byte[] METAROW = Bytes.toBytes("METAROW"); /** File Extension used while splitting an HLog into regions (HBASE-2312) */ @@ -100,7 +96,7 @@ public interface HLog { /** * Constructor for both params - * + * * @param edit * log's edit * @param key @@ -114,7 +110,7 @@ public interface HLog { /** * Gets the edit - * + * * @return edit */ public WALEdit getEdit() { @@ -123,7 +119,7 @@ public interface HLog { /** * Gets the key - * + * * @return key */ public HLogKey getKey() { @@ -132,7 +128,7 @@ public interface HLog { /** * Set compression context for this entry. - * + * * @param compressionContext * Compression context */ @@ -161,14 +157,14 @@ public interface HLog { /* * registers WALActionsListener - * + * * @param listener */ public void registerWALActionsListener(final WALActionsListener listener); /* * unregisters WALActionsListener - * + * * @param listener */ public boolean unregisterWALActionsListener(final WALActionsListener listener); @@ -182,7 +178,7 @@ public interface HLog { * Called by HRegionServer when it opens a new region to ensure that log * sequence numbers are always greater than the latest sequence number of the * region being brought on-line. - * + * * @param newvalue * We'll set log edit/sequence number to this value if it is greater * than the current value. @@ -196,19 +192,19 @@ public interface HLog { /** * Roll the log writer. That is, start writing log messages to a new file. - * + * * Because a log cannot be rolled during a cache flush, and a cache flush * spans two method calls, a special lock needs to be obtained so that a cache * flush cannot start when the log is being rolled and the log cannot be * rolled during a cache flush. - * + * *

* Note that this method cannot be synchronized because it is possible that * startCacheFlush runs, obtaining the cacheFlushLock, then this method could * start which would obtain the lock on this but block on obtaining the * cacheFlushLock and then completeCacheFlush could be called which would wait * for the lock on this and consequently never release the cacheFlushLock - * + * * @return If lots of logs, flush the returned regions so next time through we * can clean logs. Returns null if nothing to flush. Names are actual * region names as returned by {@link HRegionInfo#getEncodedName()} @@ -219,19 +215,19 @@ public interface HLog { /** * Roll the log writer. That is, start writing log messages to a new file. - * + * * Because a log cannot be rolled during a cache flush, and a cache flush * spans two method calls, a special lock needs to be obtained so that a cache * flush cannot start when the log is being rolled and the log cannot be * rolled during a cache flush. - * + * *

* Note that this method cannot be synchronized because it is possible that * startCacheFlush runs, obtaining the cacheFlushLock, then this method could * start which would obtain the lock on this but block on obtaining the * cacheFlushLock and then completeCacheFlush could be called which would wait * for the lock on this and consequently never release the cacheFlushLock - * + * * @param force * If true, force creation of a new writer even if no entries have * been written to the current writer @@ -246,21 +242,21 @@ public interface HLog { /** * Shut down the log. - * + * * @throws IOException */ public void close() throws IOException; /** * Shut down the log and delete the log directory - * + * * @throws IOException */ public void closeAndDelete() throws IOException; /** * Append an entry to the log. - * + * * @param regionInfo * @param logEdit * @param logKey @@ -274,7 +270,7 @@ public interface HLog { /** * Only used in tests. - * + * * @param info * @param tableName * @param edits @@ -289,7 +285,7 @@ public interface HLog { * Append a set of edits to the log. Log edits are keyed by (encoded) * regionName, rowname, and log-sequence-id. The HLog is not flushed after * this transaction is written to the log. - * + * * @param info * @param tableName * @param edits @@ -306,7 +302,7 @@ public interface HLog { * Append a set of edits to the log. Log edits are keyed by (encoded) * regionName, rowname, and log-sequence-id. The HLog is flushed after this * transaction is written to the log. - * + * * @param info * @param tableName * @param edits @@ -336,25 +332,25 @@ public interface HLog { /** * By acquiring a log sequence ID, we can allow log messages to continue while * we flush the cache. - * + * * Acquire a lock so that we do not roll the log between the start and * completion of a cache-flush. Otherwise the log-seq-id for the flush will * not appear in the correct logfile. - * + * * Ensuring that flushes and log-rolls don't happen concurrently also allows * us to temporarily put a log-seq-number in lastSeqWritten against the region * being flushed that might not be the earliest in-memory log-seq-number for * that region. By the time the flush is completed or aborted and before the * cacheFlushLock is released it is ensured that lastSeqWritten again has the * oldest in-memory edit's lsn for the region that was being flushed. - * + * * In this method, by removing the entry in lastSeqWritten for the region * being flushed we ensure that the next edit inserted in this region will be * correctly recorded in * {@link #append(HRegionInfo, byte[], WALEdit, long, HTableDescriptor)} The * lsn of the earliest in-memory lsn - which is now in the memstore snapshot - * is saved temporarily in the lastSeqWritten map while the flush is active. - * + * * @return sequence ID to pass * {@link #completeCacheFlush(byte[], byte[], long, boolean)} (byte[], * byte[], long)} @@ -365,9 +361,9 @@ public interface HLog { /** * Complete the cache flush - * + * * Protected by cacheFlushLock - * + * * @param encodedRegionName * @param tableName * @param logSeqId @@ -378,6 +374,14 @@ public interface HLog { throws IOException; /** + * Add a compaction marker to the WAL. + * Used to make sure we still have ownership of WAL before we start moving files around after a compaction completes. + * @param c A description of the compaction that just ran. + * @throws IOException + */ + public void writeCompactionMarker(final Compaction c) throws IOException; + + /** * Abort a cache flush. Call if the flush fails. Note that the only recovery * for an aborted flush currently is a restart of the regionserver so the * snapshot content dropped by the failure gets restored to the memstore. @@ -391,7 +395,7 @@ public interface HLog { /** * Get LowReplication-Roller status - * + * * @return lowReplicationRollEnabled */ public boolean isLowReplicationRollEnabled(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java index 7888aba..c205822 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java @@ -31,31 +31,17 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; -import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.util.Bytes; - public class HLogUtil { static final Log LOG = LogFactory.getLog(HLogUtil.class); - static final byte[] COMPLETE_CACHE_FLUSH = Bytes.toBytes("HBASE::CACHEFLUSH"); - - /** - * @param family - * @return true if the column is a meta column - */ - public static boolean isMetaFamily(byte[] family) { - return Bytes.equals(HLog.METAFAMILY, family); - } - @SuppressWarnings("unchecked") public static Class getKeyClass(Configuration conf) { return (Class) conf.getClass( @@ -90,40 +76,40 @@ public class HLogUtil { /* * Get a reader for the WAL. - * + * * @param fs - * + * * @param path - * + * * @param conf - * + * * @return A WAL reader. Close when done with it. - * + * * @throws IOException - * + * * public static HLog.Reader getReader(final FileSystem fs, final Path path, * Configuration conf) throws IOException { try { - * + * * if (logReaderClass == null) { - * + * * logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", * SequenceFileLogReader.class, Reader.class); } - * - * + * + * * HLog.Reader reader = logReaderClass.newInstance(); reader.init(fs, path, * conf); return reader; } catch (IOException e) { throw e; } catch (Exception * e) { throw new IOException("Cannot get log reader", e); } } - * + * * * Get a writer for the WAL. - * + * * @param path - * + * * @param conf - * + * * @return A WAL writer. Close when done with it. - * + * * @throws IOException - * + * * public static HLog.Writer createWriter(final FileSystem fs, final Path * path, Configuration conf) throws IOException { try { if (logWriterClass == * null) { logWriterClass = @@ -136,7 +122,7 @@ public class HLogUtil { /** * Construct the HLog directory name - * + * * @param serverName * Server name formatted as described in {@link ServerName} * @return the relative HLog directory name, e.g. @@ -163,7 +149,7 @@ public class HLogUtil { /** * Move aside a bad edits file. - * + * * @param fs * @param edits * Edits file to move aside. @@ -245,7 +231,7 @@ public class HLogUtil { /** * Return regions (memstores) that have edits that are equal or less than the * passed oldestWALseqid. - * + * * @param oldestWALseqid * @param regionsToSeqids * Encoded region names to sequence ids @@ -271,7 +257,7 @@ public class HLogUtil { /** * Returns sorted set of edit files made by wal-log splitter, excluding files * with '.temp' suffix. - * + * * @param fs * @param regiondir * @return Files in passed regiondir as a sorted set. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index 58fcbc3..ceaebcc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -27,8 +27,11 @@ import java.util.NavigableMap; import java.util.TreeMap; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.regionserver.Compaction; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.io.Writable; @@ -69,6 +72,11 @@ import org.apache.hadoop.io.Writable; */ @InterfaceAudience.Private public class WALEdit implements Writable, HeapSize { + // TODO: Make it so user cannot make a cf w/ this name. Make the illegal cf names. Ditto for row. + public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY"); + static final byte [] METAROW = Bytes.toBytes("METAROW"); + static final byte[] COMPLETE_CACHE_FLUSH = Bytes.toBytes("HBASE::CACHEFLUSH"); + static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION"); private final int VERSION_2 = -1; @@ -80,12 +88,21 @@ public class WALEdit implements Writable, HeapSize { public WALEdit() { } + /** + * @param f + * @return True is f is {@link #METAFAMILY} + */ + public static boolean isMetaEditFamily(final byte [] f) { + return Bytes.equals(METAFAMILY, f); + } + public void setCompressionContext(final CompressionContext compressionContext) { this.compressionContext = compressionContext; } - public void add(KeyValue kv) { + public WALEdit add(KeyValue kv) { this.kvs.add(kv); + return this; } public boolean isEmpty() { @@ -200,4 +217,24 @@ public class WALEdit implements Writable, HeapSize { return sb.toString(); } -} + /** + * Create a flush event WAL edit + * @return A cache flush marker WALEdit + */ + static WALEdit createFlush() { + // TODO: Should we be putting the family into the edit? St.Ack 20121121 + KeyValue kv = new KeyValue(HLog.METAROW, METAFAMILY, null, System.currentTimeMillis(), COMPLETE_CACHE_FLUSH); + return new WALEdit().add(kv); + } + + /** + * Create a compacion WALEdit + * @param c + * @return A WALEdit that has c serialized as its value + */ + static WALEdit createCompaction(final Compaction c) { + byte [] pbbytes = c.toByteArray(); + KeyValue kv = new KeyValue(HLog.METAROW, METAFAMILY, COMPACTION, System.currentTimeMillis(), pbbytes); + return new WALEdit().add(kv); + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index c003bd5..44794c8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -441,7 +441,7 @@ public class HBaseTestingUtility { //file system, the tests should use getBaseTestDir, otherwise, we can use //the working directory, and create a unique sub dir there FileSystem fs = getTestFileSystem(); - if (fs.getUri().getScheme().equals(fs.getLocal(conf).getUri().getScheme())) { + if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) { if (dataTestDir == null) { setupDataTestDir(); } @@ -1256,6 +1256,15 @@ public class HBaseTestingUtility { return rowCount; } + public void loadNumericRows(final HTable t, final byte[] f, int startRow, int endRow) throws IOException { + for (int i = startRow; i < endRow; i++) { + byte[] data = Bytes.toBytes(String.valueOf(i)); + Put put = new Put(data); + put.add(f, null, data); + t.put(put); + } + } + /** * Return the number of rows in the given table. */ @@ -2059,6 +2068,7 @@ public class HBaseTestingUtility { LOG.info("Found=" + rows); Threads.sleep(200); } + meta.close(); } /** @@ -2359,6 +2369,7 @@ public class HBaseTestingUtility { // region servers * regions per region server). int numberOfServers = admin.getClusterStatus().getServers().size(); if (numberOfServers == 0) { + admin.close(); throw new IllegalStateException("No live regionservers"); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index 8f7f3aa..c53a133 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -676,6 +676,20 @@ public class MiniHBaseCluster extends HBaseCluster { this.hbaseCluster.join(); } + public List findRegionsForTable(byte[] tableName) { + ArrayList ret = new ArrayList(); + for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { + HRegionServer hrs = rst.getRegionServer(); + for (HRegion region : hrs.getOnlineRegions(tableName)) { + if (Bytes.equals(region.getTableDesc().getName(), tableName)) { + ret.add(region); + } + } + } + return ret; + } + + protected int getRegionServerIndex(ServerName serverName) { //we have a small number of region servers, this should be fine for now. List servers = getRegionServerThreads(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java index 3da3e8a..e12f850 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java @@ -23,9 +23,6 @@ import static org.junit.Assert.assertEquals; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.AfterClass; @@ -99,13 +96,7 @@ public class TestFullLogReconstruction { // Load up the table with simple rows and count them int initialCount = TEST_UTIL.loadTable(table, FAMILY); - Scan scan = new Scan(); - ResultScanner results = table.getScanner(scan); - int count = 0; - for (Result res : results) { - count++; - } - results.close(); + int count = TEST_UTIL.countRows(table); assertEquals(initialCount, count); @@ -114,15 +105,8 @@ public class TestFullLogReconstruction { } TEST_UTIL.expireRegionServerSession(0); - scan = new Scan(); - results = table.getScanner(scan); - int newCount = 0; - for (Result res : results) { - newCount++; - } + int newCount = TEST_UTIL.countRows(table); assertEquals(count, newCount); - results.close(); table.close(); } - -} +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java new file mode 100644 index 0000000..623ccac --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.LeaseManager; +import org.apache.log4j.Level; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test for the case where a regionserver going down has enough cycles to do damage to regions + * that have actually been assigned elsehwere. + * + *

If we happen to assign a region before it fully done with in its old location -- i.e. it is on two servers at the + * same time -- all can work fine until the case where the region on the dying server decides to compact or otherwise + * change the region file set. The region in its new location will then get a surprise when it tries to do something + * w/ a file removed by the region in its old location on dying server. + * + *

Making a test for this case is a little tough in that even if a file is deleted up on the namenode, + * if the file was opened before the delete, it will continue to let reads happen until something changes the + * state of cached blocks in the dfsclient that was already open (a block from the deleted file is cleaned + * from the datanode by NN). + * + *

What we will do below is do an explicit check for existence on the files listed in the region that + * has had some files removed because of a compaction. This sort of hurry's along and makes certain what is a chance + * occurance. + */ +@Category(MediumTests.class) +public class TestIOFencing { + static final Log LOG = LogFactory.getLog(TestIOFencing.class); + static { + ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem")).getLogger().setLevel(Level.ALL); + ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL); + } + + /** + * An override of HRegion that allows us park compactions in a holding pattern and + * then when appropriate for the test, allow them proceed again. + */ + public static class CompactionBlockerRegion extends HRegion { + boolean compactionsBlocked = false; + Object compactionsBlockedLock = new Object(); + + Object compactionWaitingLock = new Object(); + boolean compactionWaiting = false; + + volatile int compactCount = 0; + + public CompactionBlockerRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf, HRegionInfo regionInfo, + HTableDescriptor htd, RegionServerServices rss) { + super(tableDir, log, fs, conf, regionInfo, htd, rss); + } + + public void stopCompactions() { + synchronized (compactionsBlockedLock) { + compactionsBlocked = true; + } + } + + public void allowCompactions() { + synchronized (compactionsBlockedLock) { + compactionsBlocked = false; + compactionsBlockedLock.notifyAll(); + } + } + + public void waitForCompactionToBlock() throws InterruptedException { + synchronized (compactionWaitingLock) { + while (!compactionWaiting) { + compactionWaitingLock.wait(); + } + } + } + + @Override + protected void doRegionCompactionPrep() throws IOException { + synchronized (compactionWaitingLock) { + compactionWaiting = true; + compactionWaitingLock.notifyAll(); + } + synchronized (compactionsBlockedLock) { + while (compactionsBlocked) { + try { + compactionsBlockedLock.wait(); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + } + synchronized (compactionWaitingLock) { + compactionWaiting = false; + compactionWaitingLock.notifyAll(); + } + super.doRegionCompactionPrep(); + } + + @Override + public boolean compact(CompactionRequest cr) throws IOException { + try { + return super.compact(cr); + } finally { + compactCount++; + } + } + + public int countStoreFiles() { + int count = 0; + for (Store store : stores.values()) { + count += store.getNumberOfStoreFiles(); + } + return count; + } + } + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static byte[] TABLE_NAME = Bytes.toBytes("tabletest"); + private final static byte[] FAMILY = Bytes.toBytes("family"); + private static final int FIRST_BATCH_COUNT = 4000; + private static final int SECOND_BATCH_COUNT = FIRST_BATCH_COUNT; + + /** + * Test that puts up a regionserver, starts a compaction on a loaded region but holds the + * compaction completion until after we have killed the server and the region has come up on + * a new regionserver altogether. This fakes the double assignment case where region in one + * location changes the files out from underneath a region being served elsewhere. + */ + @Test + public void testFencingAroundCompaction() throws Exception { + Configuration c = TEST_UTIL.getConfiguration(); + // Insert our custom region + c.setClass(HConstants.REGION_IMPL, CompactionBlockerRegion.class, HRegion.class); + c.setBoolean("dfs.support.append", true); + // Encourage plenty of flushes + c.setLong("hbase.hregion.memstore.flush.size", 200000); + // Only run compaction when we tell it to + c.setInt("hbase.hstore.compactionThreshold", 1000); + c.setLong("hbase.hstore.blockingStoreFiles", 1000); + // Compact quickly after we tell it to! + c.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000); + LOG.info("Starting mini cluster"); + TEST_UTIL.startMiniCluster(1); + CompactionBlockerRegion compactingRegion = null; + HBaseAdmin admin = null; + try { + LOG.info("Creating admin"); + admin = new HBaseAdmin(c); + LOG.info("Creating table"); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + HTable table = new HTable(c, TABLE_NAME); + LOG.info("Loading test table"); + // Load some rows + TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT); + // Find the region + List testRegions = TEST_UTIL.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME); + assertEquals(1, testRegions.size()); + compactingRegion = (CompactionBlockerRegion)testRegions.get(0); + assertTrue(compactingRegion.countStoreFiles() > 1); + final byte REGION_NAME[] = compactingRegion.getRegionName(); + LOG.info("Blocking compactions"); + compactingRegion.stopCompactions(); + LOG.info("Asking for compaction"); + admin.majorCompact(TABLE_NAME); + LOG.info("Waiting for compaction to be about to start"); + compactingRegion.waitForCompactionToBlock(); + LOG.info("Starting a new server"); + RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer(); + HRegionServer newServer = newServerThread.getRegionServer(); + LOG.info("Killing region server ZK lease"); + TEST_UTIL.expireRegionServerSession(0); + CompactionBlockerRegion newRegion = null; + long startWaitTime = System.currentTimeMillis(); + while (newRegion == null) { + LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME)); + Thread.sleep(100); + newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME); + assertTrue("Timed out waiting for new server to open region", + System.currentTimeMillis() - startWaitTime < 60000); + } + LOG.info("Allowing compaction to proceed"); + compactingRegion.allowCompactions(); + while (compactingRegion.compactCount == 0) { + Thread.sleep(1000); + } + // The server we killed stays up until the compaction that was started before it was killed completes. In logs + // you should see the old regionserver now going down. + LOG.info("Compaction finished"); + // After compaction of old region finishes on the server that was going down, make sure that + // all the files we expect are still working when region is up in new location. + FileSystem fs = newRegion.getFilesystem(); + for (String f: newRegion.getStoreFileList(new byte [][] {FAMILY})) { + assertTrue("After compaction, does not exist: " + f, fs.exists(new Path(f))); + } + // If we survive the split keep going... + // Now we make sure that the region isn't totally confused. Load up more rows. + TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT, FIRST_BATCH_COUNT + SECOND_BATCH_COUNT); + admin.majorCompact(TABLE_NAME); + startWaitTime = System.currentTimeMillis(); + while (newRegion.compactCount == 0) { + Thread.sleep(1000); + assertTrue("New region never compacted", System.currentTimeMillis() - startWaitTime < 30000); + } + assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, TEST_UTIL.countRows(table)); + } finally { + if (compactingRegion != null) { + compactingRegion.allowCompactions(); + } + admin.close(); + TEST_UTIL.shutdownMiniCluster(); + } + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java index 426a586..0a7cb19 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java @@ -34,11 +34,11 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.hbase.MediumTests; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.IsolationLevel; @@ -47,11 +47,8 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.BaseRowProcessor; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; - import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.junit.AfterClass; @@ -323,7 +320,7 @@ public class TestRowProcessorEndpoint { // We can also inject some meta data to the walEdit KeyValue metaKv = new KeyValue( - row, HLog.METAFAMILY, + row, WALEdit.METAFAMILY, Bytes.toBytes("I just increment counter"), Bytes.toBytes(counter)); walEdit.add(metaKv); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index 265f777..8811c7d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -550,9 +550,9 @@ public class TestHLog { assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName())); assertTrue(Bytes.equals(tableName, key.getTablename())); KeyValue kv = val.getKeyValues().get(0); - assertTrue(Bytes.equals(HLog.METAROW, kv.getRow())); - assertTrue(Bytes.equals(HLog.METAFAMILY, kv.getFamily())); - assertEquals(0, Bytes.compareTo(HLogUtil.COMPLETE_CACHE_FLUSH, + assertTrue(Bytes.equals(WALEdit.METAROW, kv.getRow())); + assertTrue(Bytes.equals(WALEdit.METAFAMILY, kv.getFamily())); + assertEquals(0, Bytes.compareTo(WALEdit.COMPLETE_CACHE_FLUSH, val.getKeyValues().get(0).getValue())); System.out.println(key + " " + val); } @@ -618,9 +618,9 @@ public class TestHLog { assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName())); assertTrue(Bytes.equals(tableName, entry.getKey().getTablename())); - assertTrue(Bytes.equals(HLog.METAROW, val.getRow())); - assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily())); - assertEquals(0, Bytes.compareTo(HLogUtil.COMPLETE_CACHE_FLUSH, + assertTrue(Bytes.equals(WALEdit.METAROW, val.getRow())); + assertTrue(Bytes.equals(WALEdit.METAFAMILY, val.getFamily())); + assertEquals(0, Bytes.compareTo(WALEdit.COMPLETE_CACHE_FLUSH, val.getValue())); System.out.println(entry.getKey() + " " + val); }