diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 66885fc..3db0379 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -35,19 +35,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.NavigableSet; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ListMultimap; -import com.google.common.collect.Lists; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; -import com.google.protobuf.RpcChannel; -import com.google.protobuf.Service; -import com.google.protobuf.ServiceException; -import com.google.protobuf.TextFormat; - - import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; @@ -63,13 +52,13 @@ import org.apache.hadoop.hbase.client.AdminProtocol; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.ClientProtocol; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException; @@ -117,6 +106,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Re import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos; import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsResponse; +import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor; import org.apache.hadoop.hbase.security.access.Permission; import org.apache.hadoop.hbase.security.access.TablePermission; import org.apache.hadoop.hbase.security.access.UserPermission; @@ -128,6 +118,17 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.Token; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.RpcChannel; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; +import com.google.protobuf.TextFormat; + /** * Protobufs utility. */ @@ -1997,4 +1998,23 @@ public final class ProtobufUtil { throw new IOException(e); } } + + public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] family, + List inputPaths, List outputPaths, Path storeDir) { + // compaction descriptor contains relative paths. + // input / output paths are relative to the store dir + // store dir is relative to region dir + CompactionDescriptor.Builder builder = CompactionDescriptor.newBuilder() + .setTableName(ByteString.copyFrom(info.getTableName())) + .setEncodedRegionName(ByteString.copyFrom(info.getEncodedNameAsBytes())) + .setFamilyName(ByteString.copyFrom(family)) + .setStoreHomeDir(storeDir.getName()); //make relative + for (Path inputPath : inputPaths) { + builder.addCompactionInput(inputPath.getName()); //relative path + } + for (Path outputPath : outputPaths) { + builder.addCompactionOutput(outputPath.getName()); + } + return builder.build(); + } } diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WAL.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WAL.java new file mode 100644 index 0000000..020debd --- /dev/null +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WAL.java @@ -0,0 +1,938 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: WAL.proto + +package org.apache.hadoop.hbase.protobuf.generated; + +public final class WAL { + private WAL() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + } + public interface CompactionDescriptorOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required bytes tableName = 1; + boolean hasTableName(); + com.google.protobuf.ByteString getTableName(); + + // required bytes encodedRegionName = 2; + boolean hasEncodedRegionName(); + com.google.protobuf.ByteString getEncodedRegionName(); + + // required bytes familyName = 3; + boolean hasFamilyName(); + com.google.protobuf.ByteString getFamilyName(); + + // repeated string compactionInput = 4; + java.util.List getCompactionInputList(); + int getCompactionInputCount(); + String getCompactionInput(int index); + + // repeated string compactionOutput = 5; + java.util.List getCompactionOutputList(); + int getCompactionOutputCount(); + String getCompactionOutput(int index); + + // required string storeHomeDir = 6; + boolean hasStoreHomeDir(); + String getStoreHomeDir(); + } + public static final class CompactionDescriptor extends + com.google.protobuf.GeneratedMessage + implements CompactionDescriptorOrBuilder { + // Use CompactionDescriptor.newBuilder() to construct. + private CompactionDescriptor(Builder builder) { + super(builder); + } + private CompactionDescriptor(boolean noInit) {} + + private static final CompactionDescriptor defaultInstance; + public static CompactionDescriptor getDefaultInstance() { + return defaultInstance; + } + + public CompactionDescriptor getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.WAL.internal_static_CompactionDescriptor_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.WAL.internal_static_CompactionDescriptor_fieldAccessorTable; + } + + private int bitField0_; + // required bytes tableName = 1; + public static final int TABLENAME_FIELD_NUMBER = 1; + private com.google.protobuf.ByteString tableName_; + public boolean hasTableName() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public com.google.protobuf.ByteString getTableName() { + return tableName_; + } + + // required bytes encodedRegionName = 2; + public static final int ENCODEDREGIONNAME_FIELD_NUMBER = 2; + private com.google.protobuf.ByteString encodedRegionName_; + public boolean hasEncodedRegionName() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public com.google.protobuf.ByteString getEncodedRegionName() { + return encodedRegionName_; + } + + // required bytes familyName = 3; + public static final int FAMILYNAME_FIELD_NUMBER = 3; + private com.google.protobuf.ByteString familyName_; + public boolean hasFamilyName() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public com.google.protobuf.ByteString getFamilyName() { + return familyName_; + } + + // repeated string compactionInput = 4; + public static final int COMPACTIONINPUT_FIELD_NUMBER = 4; + private com.google.protobuf.LazyStringList compactionInput_; + public java.util.List + getCompactionInputList() { + return compactionInput_; + } + public int getCompactionInputCount() { + return compactionInput_.size(); + } + public String getCompactionInput(int index) { + return compactionInput_.get(index); + } + + // repeated string compactionOutput = 5; + public static final int COMPACTIONOUTPUT_FIELD_NUMBER = 5; + private com.google.protobuf.LazyStringList compactionOutput_; + public java.util.List + getCompactionOutputList() { + return compactionOutput_; + } + public int getCompactionOutputCount() { + return compactionOutput_.size(); + } + public String getCompactionOutput(int index) { + return compactionOutput_.get(index); + } + + // required string storeHomeDir = 6; + public static final int STOREHOMEDIR_FIELD_NUMBER = 6; + private java.lang.Object storeHomeDir_; + public boolean hasStoreHomeDir() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public String getStoreHomeDir() { + java.lang.Object ref = storeHomeDir_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (com.google.protobuf.Internal.isValidUtf8(bs)) { + storeHomeDir_ = s; + } + return s; + } + } + private com.google.protobuf.ByteString getStoreHomeDirBytes() { + java.lang.Object ref = storeHomeDir_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((String) ref); + storeHomeDir_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + private void initFields() { + tableName_ = com.google.protobuf.ByteString.EMPTY; + encodedRegionName_ = com.google.protobuf.ByteString.EMPTY; + familyName_ = com.google.protobuf.ByteString.EMPTY; + compactionInput_ = com.google.protobuf.LazyStringArrayList.EMPTY; + compactionOutput_ = com.google.protobuf.LazyStringArrayList.EMPTY; + storeHomeDir_ = ""; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasTableName()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasEncodedRegionName()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasFamilyName()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasStoreHomeDir()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, tableName_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBytes(2, encodedRegionName_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeBytes(3, familyName_); + } + for (int i = 0; i < compactionInput_.size(); i++) { + output.writeBytes(4, compactionInput_.getByteString(i)); + } + for (int i = 0; i < compactionOutput_.size(); i++) { + output.writeBytes(5, compactionOutput_.getByteString(i)); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeBytes(6, getStoreHomeDirBytes()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(1, tableName_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(2, encodedRegionName_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(3, familyName_); + } + { + int dataSize = 0; + for (int i = 0; i < compactionInput_.size(); i++) { + dataSize += com.google.protobuf.CodedOutputStream + .computeBytesSizeNoTag(compactionInput_.getByteString(i)); + } + size += dataSize; + size += 1 * getCompactionInputList().size(); + } + { + int dataSize = 0; + for (int i = 0; i < compactionOutput_.size(); i++) { + dataSize += com.google.protobuf.CodedOutputStream + .computeBytesSizeNoTag(compactionOutput_.getByteString(i)); + } + size += dataSize; + size += 1 * getCompactionOutputList().size(); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(6, getStoreHomeDirBytes()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor other = (org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor) obj; + + boolean result = true; + result = result && (hasTableName() == other.hasTableName()); + if (hasTableName()) { + result = result && getTableName() + .equals(other.getTableName()); + } + result = result && (hasEncodedRegionName() == other.hasEncodedRegionName()); + if (hasEncodedRegionName()) { + result = result && getEncodedRegionName() + .equals(other.getEncodedRegionName()); + } + result = result && (hasFamilyName() == other.hasFamilyName()); + if (hasFamilyName()) { + result = result && getFamilyName() + .equals(other.getFamilyName()); + } + result = result && getCompactionInputList() + .equals(other.getCompactionInputList()); + result = result && getCompactionOutputList() + .equals(other.getCompactionOutputList()); + result = result && (hasStoreHomeDir() == other.hasStoreHomeDir()); + if (hasStoreHomeDir()) { + result = result && getStoreHomeDir() + .equals(other.getStoreHomeDir()); + } + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + @java.lang.Override + public int hashCode() { + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasTableName()) { + hash = (37 * hash) + TABLENAME_FIELD_NUMBER; + hash = (53 * hash) + getTableName().hashCode(); + } + if (hasEncodedRegionName()) { + hash = (37 * hash) + ENCODEDREGIONNAME_FIELD_NUMBER; + hash = (53 * hash) + getEncodedRegionName().hashCode(); + } + if (hasFamilyName()) { + hash = (37 * hash) + FAMILYNAME_FIELD_NUMBER; + hash = (53 * hash) + getFamilyName().hashCode(); + } + if (getCompactionInputCount() > 0) { + hash = (37 * hash) + COMPACTIONINPUT_FIELD_NUMBER; + hash = (53 * hash) + getCompactionInputList().hashCode(); + } + if (getCompactionOutputCount() > 0) { + hash = (37 * hash) + COMPACTIONOUTPUT_FIELD_NUMBER; + hash = (53 * hash) + getCompactionOutputList().hashCode(); + } + if (hasStoreHomeDir()) { + hash = (37 * hash) + STOREHOMEDIR_FIELD_NUMBER; + hash = (53 * hash) + getStoreHomeDir().hashCode(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + return hash; + } + + public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptorOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.WAL.internal_static_CompactionDescriptor_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.WAL.internal_static_CompactionDescriptor_fieldAccessorTable; + } + + // Construct using org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder(BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + tableName_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000001); + encodedRegionName_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000002); + familyName_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000004); + compactionInput_ = com.google.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000008); + compactionOutput_ = com.google.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000010); + storeHomeDir_ = ""; + bitField0_ = (bitField0_ & ~0x00000020); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor.getDescriptor(); + } + + public org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor getDefaultInstanceForType() { + return org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor build() { + org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor buildPartial() { + org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor result = new org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.tableName_ = tableName_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.encodedRegionName_ = encodedRegionName_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.familyName_ = familyName_; + if (((bitField0_ & 0x00000008) == 0x00000008)) { + compactionInput_ = new com.google.protobuf.UnmodifiableLazyStringList( + compactionInput_); + bitField0_ = (bitField0_ & ~0x00000008); + } + result.compactionInput_ = compactionInput_; + if (((bitField0_ & 0x00000010) == 0x00000010)) { + compactionOutput_ = new com.google.protobuf.UnmodifiableLazyStringList( + compactionOutput_); + bitField0_ = (bitField0_ & ~0x00000010); + } + result.compactionOutput_ = compactionOutput_; + if (((from_bitField0_ & 0x00000020) == 0x00000020)) { + to_bitField0_ |= 0x00000008; + } + result.storeHomeDir_ = storeHomeDir_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor) { + return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor other) { + if (other == org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor.getDefaultInstance()) return this; + if (other.hasTableName()) { + setTableName(other.getTableName()); + } + if (other.hasEncodedRegionName()) { + setEncodedRegionName(other.getEncodedRegionName()); + } + if (other.hasFamilyName()) { + setFamilyName(other.getFamilyName()); + } + if (!other.compactionInput_.isEmpty()) { + if (compactionInput_.isEmpty()) { + compactionInput_ = other.compactionInput_; + bitField0_ = (bitField0_ & ~0x00000008); + } else { + ensureCompactionInputIsMutable(); + compactionInput_.addAll(other.compactionInput_); + } + onChanged(); + } + if (!other.compactionOutput_.isEmpty()) { + if (compactionOutput_.isEmpty()) { + compactionOutput_ = other.compactionOutput_; + bitField0_ = (bitField0_ & ~0x00000010); + } else { + ensureCompactionOutputIsMutable(); + compactionOutput_.addAll(other.compactionOutput_); + } + onChanged(); + } + if (other.hasStoreHomeDir()) { + setStoreHomeDir(other.getStoreHomeDir()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasTableName()) { + + return false; + } + if (!hasEncodedRegionName()) { + + return false; + } + if (!hasFamilyName()) { + + return false; + } + if (!hasStoreHomeDir()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + tableName_ = input.readBytes(); + break; + } + case 18: { + bitField0_ |= 0x00000002; + encodedRegionName_ = input.readBytes(); + break; + } + case 26: { + bitField0_ |= 0x00000004; + familyName_ = input.readBytes(); + break; + } + case 34: { + ensureCompactionInputIsMutable(); + compactionInput_.add(input.readBytes()); + break; + } + case 42: { + ensureCompactionOutputIsMutable(); + compactionOutput_.add(input.readBytes()); + break; + } + case 50: { + bitField0_ |= 0x00000020; + storeHomeDir_ = input.readBytes(); + break; + } + } + } + } + + private int bitField0_; + + // required bytes tableName = 1; + private com.google.protobuf.ByteString tableName_ = com.google.protobuf.ByteString.EMPTY; + public boolean hasTableName() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public com.google.protobuf.ByteString getTableName() { + return tableName_; + } + public Builder setTableName(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + tableName_ = value; + onChanged(); + return this; + } + public Builder clearTableName() { + bitField0_ = (bitField0_ & ~0x00000001); + tableName_ = getDefaultInstance().getTableName(); + onChanged(); + return this; + } + + // required bytes encodedRegionName = 2; + private com.google.protobuf.ByteString encodedRegionName_ = com.google.protobuf.ByteString.EMPTY; + public boolean hasEncodedRegionName() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public com.google.protobuf.ByteString getEncodedRegionName() { + return encodedRegionName_; + } + public Builder setEncodedRegionName(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + encodedRegionName_ = value; + onChanged(); + return this; + } + public Builder clearEncodedRegionName() { + bitField0_ = (bitField0_ & ~0x00000002); + encodedRegionName_ = getDefaultInstance().getEncodedRegionName(); + onChanged(); + return this; + } + + // required bytes familyName = 3; + private com.google.protobuf.ByteString familyName_ = com.google.protobuf.ByteString.EMPTY; + public boolean hasFamilyName() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public com.google.protobuf.ByteString getFamilyName() { + return familyName_; + } + public Builder setFamilyName(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + familyName_ = value; + onChanged(); + return this; + } + public Builder clearFamilyName() { + bitField0_ = (bitField0_ & ~0x00000004); + familyName_ = getDefaultInstance().getFamilyName(); + onChanged(); + return this; + } + + // repeated string compactionInput = 4; + private com.google.protobuf.LazyStringList compactionInput_ = com.google.protobuf.LazyStringArrayList.EMPTY; + private void ensureCompactionInputIsMutable() { + if (!((bitField0_ & 0x00000008) == 0x00000008)) { + compactionInput_ = new com.google.protobuf.LazyStringArrayList(compactionInput_); + bitField0_ |= 0x00000008; + } + } + public java.util.List + getCompactionInputList() { + return java.util.Collections.unmodifiableList(compactionInput_); + } + public int getCompactionInputCount() { + return compactionInput_.size(); + } + public String getCompactionInput(int index) { + return compactionInput_.get(index); + } + public Builder setCompactionInput( + int index, String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureCompactionInputIsMutable(); + compactionInput_.set(index, value); + onChanged(); + return this; + } + public Builder addCompactionInput(String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureCompactionInputIsMutable(); + compactionInput_.add(value); + onChanged(); + return this; + } + public Builder addAllCompactionInput( + java.lang.Iterable values) { + ensureCompactionInputIsMutable(); + super.addAll(values, compactionInput_); + onChanged(); + return this; + } + public Builder clearCompactionInput() { + compactionInput_ = com.google.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000008); + onChanged(); + return this; + } + void addCompactionInput(com.google.protobuf.ByteString value) { + ensureCompactionInputIsMutable(); + compactionInput_.add(value); + onChanged(); + } + + // repeated string compactionOutput = 5; + private com.google.protobuf.LazyStringList compactionOutput_ = com.google.protobuf.LazyStringArrayList.EMPTY; + private void ensureCompactionOutputIsMutable() { + if (!((bitField0_ & 0x00000010) == 0x00000010)) { + compactionOutput_ = new com.google.protobuf.LazyStringArrayList(compactionOutput_); + bitField0_ |= 0x00000010; + } + } + public java.util.List + getCompactionOutputList() { + return java.util.Collections.unmodifiableList(compactionOutput_); + } + public int getCompactionOutputCount() { + return compactionOutput_.size(); + } + public String getCompactionOutput(int index) { + return compactionOutput_.get(index); + } + public Builder setCompactionOutput( + int index, String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureCompactionOutputIsMutable(); + compactionOutput_.set(index, value); + onChanged(); + return this; + } + public Builder addCompactionOutput(String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureCompactionOutputIsMutable(); + compactionOutput_.add(value); + onChanged(); + return this; + } + public Builder addAllCompactionOutput( + java.lang.Iterable values) { + ensureCompactionOutputIsMutable(); + super.addAll(values, compactionOutput_); + onChanged(); + return this; + } + public Builder clearCompactionOutput() { + compactionOutput_ = com.google.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000010); + onChanged(); + return this; + } + void addCompactionOutput(com.google.protobuf.ByteString value) { + ensureCompactionOutputIsMutable(); + compactionOutput_.add(value); + onChanged(); + } + + // required string storeHomeDir = 6; + private java.lang.Object storeHomeDir_ = ""; + public boolean hasStoreHomeDir() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + public String getStoreHomeDir() { + java.lang.Object ref = storeHomeDir_; + if (!(ref instanceof String)) { + String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); + storeHomeDir_ = s; + return s; + } else { + return (String) ref; + } + } + public Builder setStoreHomeDir(String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000020; + storeHomeDir_ = value; + onChanged(); + return this; + } + public Builder clearStoreHomeDir() { + bitField0_ = (bitField0_ & ~0x00000020); + storeHomeDir_ = getDefaultInstance().getStoreHomeDir(); + onChanged(); + return this; + } + void setStoreHomeDir(com.google.protobuf.ByteString value) { + bitField0_ |= 0x00000020; + storeHomeDir_ = value; + onChanged(); + } + + // @@protoc_insertion_point(builder_scope:CompactionDescriptor) + } + + static { + defaultInstance = new CompactionDescriptor(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:CompactionDescriptor) + } + + private static com.google.protobuf.Descriptors.Descriptor + internal_static_CompactionDescriptor_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_CompactionDescriptor_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\tWAL.proto\032\013hbase.proto\"\241\001\n\024CompactionD" + + "escriptor\022\021\n\ttableName\030\001 \002(\014\022\031\n\021encodedR" + + "egionName\030\002 \002(\014\022\022\n\nfamilyName\030\003 \002(\014\022\027\n\017c" + + "ompactionInput\030\004 \003(\t\022\030\n\020compactionOutput" + + "\030\005 \003(\t\022\024\n\014storeHomeDir\030\006 \002(\tB6\n*org.apac" + + "he.hadoop.hbase.protobuf.generatedB\003WALH" + + "\001\240\001\001" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_CompactionDescriptor_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_CompactionDescriptor_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_CompactionDescriptor_descriptor, + new java.lang.String[] { "TableName", "EncodedRegionName", "FamilyName", "CompactionInput", "CompactionOutput", "StoreHomeDir", }, + org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor.class, + org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor.Builder.class); + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.getDescriptor(), + }, assigner); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git hbase-protocol/src/main/protobuf/WAL.proto hbase-protocol/src/main/protobuf/WAL.proto new file mode 100644 index 0000000..59b303e --- /dev/null +++ hbase-protocol/src/main/protobuf/WAL.proto @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +option java_package = "org.apache.hadoop.hbase.protobuf.generated"; +option java_outer_classname = "WAL"; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +import "hbase.proto"; + +/** + * WAL entries + */ + +/** + * Special WAL entry to hold all related to a compaction. + * Written to WAL before completing compaction. There is + * sufficient info in the below message to complete later + * the * compaction should we fail the WAL write. + */ +message CompactionDescriptor { + required bytes tableName = 1; + required bytes encodedRegionName = 2; + required bytes familyName = 3; + repeated string compactionInput = 4; + repeated string compactionOutput = 5; + required string storeHomeDir = 6; +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 8dd8e9d..ec34ac5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -35,9 +35,7 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; -import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; @@ -83,7 +81,7 @@ public class WALPlayer extends Configured implements Tool { // skip all other tables if (Bytes.equals(table, key.getTablename())) { for (KeyValue kv : value.getKeyValues()) { - if (HLogUtil.isMetaFamily(kv.getFamily())) continue; + if (WALEdit.isMetaEditFamily(kv.getFamily())) continue; context.write(new ImmutableBytesWritable(kv.getRow()), kv); } } @@ -127,7 +125,7 @@ public class WALPlayer extends Configured implements Tool { KeyValue lastKV = null; for (KeyValue kv : value.getKeyValues()) { // filtering HLog meta entries - if (HLogUtil.isMetaFamily(kv.getFamily())) continue; + if (WALEdit.isMetaEditFamily(kv.getFamily())) continue; // A WALEdit may contain multiple operations (HBASE-3584) and/or // multiple rows (HBASE-5229). diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 03040eb..bd9c73b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.IsolationLevel; @@ -89,7 +90,6 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare; import org.apache.hadoop.hbase.exceptions.DroppedSnapshotException; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; @@ -115,6 +115,7 @@ import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.wal.HLog; @@ -235,7 +236,7 @@ public class HRegion implements HeapSize { // , Writable{ private final HLog log; private final HRegionFileSystem fs; - private final Configuration conf; + protected final Configuration conf; private final Configuration baseConf; private final KeyValue.KVComparator comparator; private final int rowLockWaitDuration; @@ -481,7 +482,7 @@ public class HRegion implements HeapSize { // , Writable{ // When hbase.regionserver.optionallogflushinterval <= 0 , deferred log sync is disabled. this.deferredLogSyncDisabled = conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000) <= 0; - + if (rsServices != null) { this.rsAccounting = this.rsServices.getRegionServerAccounting(); // don't initialize coprocessors if not running within a regionserver @@ -1127,7 +1128,7 @@ public class HRegion implements HeapSize { // , Writable{ * Do preparation for pending compaction. * @throws IOException */ - void doRegionCompactionPrep() throws IOException { + protected void doRegionCompactionPrep() throws IOException { } void triggerMajorCompaction() { @@ -2078,8 +2079,8 @@ public class HRegion implements HeapSize { // , Writable{ // calling the pre CP hook for batch mutation if (coprocessorHost != null) { - MiniBatchOperationInProgress> miniBatchOp = - new MiniBatchOperationInProgress>(batchOp.operations, + MiniBatchOperationInProgress> miniBatchOp = + new MiniBatchOperationInProgress>(batchOp.operations, batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L; } @@ -2115,7 +2116,7 @@ public class HRegion implements HeapSize { // , Writable{ batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; Mutation m = batchOp.operations[i].getFirst(); - Durability tmpDur = m.getDurability(); + Durability tmpDur = m.getDurability(); if (tmpDur.ordinal() > durability.ordinal()) { durability = tmpDur; } @@ -2165,8 +2166,8 @@ public class HRegion implements HeapSize { // , Writable{ walSyncSuccessful = true; // calling the post CP hook for batch mutation if (coprocessorHost != null) { - MiniBatchOperationInProgress> miniBatchOp = - new MiniBatchOperationInProgress>(batchOp.operations, + MiniBatchOperationInProgress> miniBatchOp = + new MiniBatchOperationInProgress>(batchOp.operations, batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); coprocessorHost.postBatchMutate(miniBatchOp); } @@ -2877,9 +2878,16 @@ public class HRegion implements HeapSize { // , Writable{ for (KeyValue kv: val.getKeyValues()) { // Check this edit is for me. Also, guard against writing the special // METACOLUMN info such as HBASE::CACHEFLUSH entries - if (kv.matchingFamily(HLog.METAFAMILY) || + if (kv.matchingFamily(WALEdit.METAFAMILY) || !Bytes.equals(key.getEncodedRegionName(), this.getRegionInfo().getEncodedNameAsBytes())) { + //this is a special edit, we should handle it + CompactionDescriptor compaction = WALEdit.getCompaction(kv); + if (compaction != null) { + //replay the compaction + completeCompactionMarker(compaction); + } + skippedEdits++; continue; } @@ -2954,6 +2962,23 @@ public class HRegion implements HeapSize { // , Writable{ } /** + * Call to complete a compaction. Its for the case where we find in the WAL a compaction + * that was not finished. We could find one recovering a WAL after a regionserver crash. + * See HBASE-2331. + * @param fs + * @param compaction + */ + void completeCompactionMarker(CompactionDescriptor compaction) + throws IOException { + Store store = this.getStore(compaction.getFamilyName().toByteArray()); + if (store == null) { + LOG.warn("Found Compaction WAL edit for deleted family:" + Bytes.toString(compaction.getFamilyName().toByteArray())); + return; + } + store.completeCompactionMarker(compaction); + } + + /** * Used by tests * @param s Store to add edit too. * @param kv KeyValue to add. @@ -3425,10 +3450,10 @@ public class HRegion implements HeapSize { // , Writable{ public long getMvccReadPoint() { return this.readPt; } - + /** * Reset both the filter and the old filter. - * + * * @throws IOException in case a filter raises an I/O exception. */ protected void resetFilters() throws IOException { @@ -3637,7 +3662,7 @@ public class HRegion implements HeapSize { // , Writable{ // If joinedHeap is pointing to some other row, try to seek to a correct one. boolean mayHaveData = (nextJoinedKv != null && nextJoinedKv.matchingRow(currentRow, offset, length)) - || (this.joinedHeap.requestSeek(KeyValue.createFirstOnRow(currentRow, offset, length), + || (this.joinedHeap.requestSeek(KeyValue.createFirstOnRow(currentRow, offset, length), true, true) && joinedHeap.peek() != null && joinedHeap.peek().matchingRow(currentRow, offset, length)); @@ -4245,7 +4270,7 @@ public class HRegion implements HeapSize { // , Writable{ LOG.debug("Files for region: " + b); b.getRegionFileSystem().logFileSystemState(LOG); } - + RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true); if (!rmt.prepare(null)) { throw new IOException("Unable to merge regions " + a + " and " + b); @@ -4271,7 +4296,7 @@ public class HRegion implements HeapSize { // , Writable{ LOG.debug("Files for new region"); dstRegion.getRegionFileSystem().logFileSystemState(LOG); } - + if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) { throw new IOException("Merged region " + dstRegion + " still has references after the compaction, is compaction canceled?"); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 59a463f..2091a32 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -52,23 +52,23 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.exceptions.InvalidHFileException; import org.apache.hadoop.hbase.exceptions.WrongRegionException; -import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.exceptions.InvalidHFileException; import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.regionserver.compactions.Compactor; import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; +import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; @@ -124,6 +124,16 @@ public class HStore implements Store { static int closeCheckInterval = 0; private volatile long storeSize = 0L; private volatile long totalUncompressedBytes = 0L; + + /** + * RWLock for store operations. + * Locked in shared mode when the list of component stores is looked at: + * - all reads/writes to table data + * - checking for split + * Locked in exclusive mode when the list of component stores is modified: + * - closing + * - completing a compaction + */ final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final boolean verifyBulkLoads; @@ -385,14 +395,11 @@ public class HStore implements Store { new ExecutorCompletionService(storeFileOpenerThreadPool); int totalValidStoreFile = 0; - final FileSystem fs = this.getFileSystem(); for (final StoreFileInfo storeFileInfo: files) { // open each store file in parallel completionService.submit(new Callable() { public StoreFile call() throws IOException { - StoreFile storeFile = new StoreFile(fs, storeFileInfo.getPath(), conf, cacheConf, - family.getBloomFilterType(), dataBlockEncoder); - storeFile.createReader(); + StoreFile storeFile = createStoreFileAndReader(storeFileInfo.getPath()); return storeFile; } }); @@ -436,6 +443,17 @@ public class HStore implements Store { return results; } + private StoreFile createStoreFileAndReader(final Path p) throws IOException { + return createStoreFileAndReader(p, this.dataBlockEncoder); + } + + private StoreFile createStoreFileAndReader(final Path p, final HFileDataBlockEncoder encoder) throws IOException { + StoreFile storeFile = new StoreFile(this.getFileSystem(), p, this.conf, this.cacheConf, + this.family.getBloomFilterType(), encoder); + storeFile.createReader(); + return storeFile; + } + @Override public long add(final KeyValue kv) { lock.readLock().lock(); @@ -549,10 +567,9 @@ public class HStore implements Store { Path srcPath = new Path(srcPathStr); Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); - StoreFile sf = new StoreFile(this.getFileSystem(), dstPath, this.conf, this.cacheConf, - this.family.getBloomFilterType(), this.dataBlockEncoder); + StoreFile sf = createStoreFileAndReader(dstPath); - StoreFile.Reader r = sf.createReader(); + StoreFile.Reader r = sf.getReader(); this.storeSize += r.length(); this.totalUncompressedBytes += r.getTotalUncompressedBytes(); @@ -712,10 +729,9 @@ public class HStore implements Store { Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path); status.setStatus("Flushing " + this + ": reopening flushed file"); - StoreFile sf = new StoreFile(this.getFileSystem(), dstPath, this.conf, this.cacheConf, - this.family.getBloomFilterType(), this.dataBlockEncoder); + StoreFile sf = createStoreFileAndReader(dstPath); - StoreFile.Reader r = sf.createReader(); + StoreFile.Reader r = sf.getReader(); this.storeSize += r.length(); this.totalUncompressedBytes += r.getTotalUncompressedBytes(); @@ -874,6 +890,29 @@ public class HStore implements Store { *

We don't want to hold the structureLock for the whole time, as a compact() * can be lengthy and we want to allow cache-flushes during this period. * + *

Compaction event should be idempotent, since there is no IO Fencing for + * the region directory in hdfs. A region server might still try to complete the + * compaction after it lost the region. That is why the following events are carefully + * ordered for a compaction: + * 1. Compaction writes new files under region/.tmp directory (compaction output) + * 2. Compaction atomically moves the temporary file under region directory + * 3. Compaction appends a WAL edit containing the compaction input and output files. + * Forces sync on WAL. + * 4. Compaction deletes the input files from the region directory. + * + * Failure conditions are handled like this: + * - If RS fails before 2, compaction wont complete. Even if RS lives on and finishes + * the compaction later, it will only write the new data file to the region directory. + * Since we already have this data, this will be idempotent but we will have a redundant + * copy of the data. + * - If RS fails between 2 and 3, the region will have a redundant copy of the data. The + * RS that failed won't be able to finish snyc() for WAL because of lease recovery in WAL. + * - If RS fails after 3, the region region server who opens the region will pick up the + * the compaction marker from the WAL and replay it by removing the compaction input files. + * Failed RS can also attempt to delete those files, but the operation will be idempotent + * + * See HBASE-2231 for details. + * * @param compaction compaction details obtained from requestCompaction() * @throws IOException * @return Storefile we compacted into or null if we failed or opted out early. @@ -902,6 +941,13 @@ public class HStore implements Store { List newFiles = compaction.compact(); // Move the compaction into place. if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) { + //Write compaction to WAL + List inputPaths = new ArrayList(); + for (StoreFile f : filesToCompact) { + inputPaths.add(f.getPath()); + } + + ArrayList outputPaths = new ArrayList(newFiles.size()); for (Path newFile: newFiles) { assert newFile != null; StoreFile sf = moveFileIntoPlace(newFile); @@ -910,14 +956,21 @@ public class HStore implements Store { } assert sf != null; sfs.add(sf); + outputPaths.add(sf.getPath()); + } + if (region.getLog() != null) { + HRegionInfo info = this.region.getRegionInfo(); + CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info, + family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString())); + + HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(), + this.region.getRegionInfo(), compactionDescriptor); } completeCompaction(filesToCompact, sfs); } else { for (Path newFile: newFiles) { // Create storefile around what we wrote with a reader on it. - StoreFile sf = new StoreFile(this.getFileSystem(), newFile, this.conf, this.cacheConf, - this.family.getBloomFilterType(), this.dataBlockEncoder); - sf.createReader(); + StoreFile sf = createStoreFileAndReader(newFile); sfs.add(sf); } } @@ -966,10 +1019,58 @@ public class HStore implements Store { validateStoreFile(newFile); // Move the file into the right spot Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile); - StoreFile result = new StoreFile(this.getFileSystem(), destPath, this.conf, this.cacheConf, - this.family.getBloomFilterType(), this.dataBlockEncoder); - result.createReader(); - return result; + StoreFile sf = createStoreFileAndReader(destPath); + + return sf; + } + + /** + * Call to complete a compaction. Its for the case where we find in the WAL a compaction + * that was not finished. We could find one recovering a WAL after a regionserver crash. + * See HBASE-2331. + * @param compaction + */ + public void completeCompactionMarker(CompactionDescriptor compaction) + throws IOException { + LOG.debug("Completing compaction from the WAL marker"); + List compactionInputs = compaction.getCompactionInputList(); + List compactionOutputs = compaction.getCompactionOutputList(); + + List outputStoreFiles = new ArrayList(compactionOutputs.size()); + for (String compactionOutput : compactionOutputs) { + //we should have this store file already + boolean found = false; + Path outputPath = new Path(fs.getStoreDir(family.getNameAsString()), compactionOutput); + outputPath = outputPath.makeQualified(fs.getFileSystem()); + for (StoreFile sf : this.getStorefiles()) { + if (sf.getPath().makeQualified(sf.getPath().getFileSystem(conf)).equals(outputPath)) { + found = true; + break; + } + } + if (!found) { + if (getFileSystem().exists(outputPath)) { + outputStoreFiles.add(createStoreFileAndReader(outputPath)); + } + } + } + + List inputPaths = new ArrayList(compactionInputs.size()); + for (String compactionInput : compactionInputs) { + Path inputPath = new Path(fs.getStoreDir(family.getNameAsString()), compactionInput); + inputPath = inputPath.makeQualified(fs.getFileSystem()); + inputPaths.add(inputPath); + } + + //some of the input files might already be deleted + List inputStoreFiles = new ArrayList(compactionInputs.size()); + for (StoreFile sf : this.getStorefiles()) { + if (inputPaths.contains(sf.getPath().makeQualified(fs.getFileSystem()))) { + inputStoreFiles.add(sf); + } + } + + this.completeCompaction(inputStoreFiles, outputStoreFiles); } /** @@ -1176,10 +1277,7 @@ public class HStore implements Store { throws IOException { StoreFile storeFile = null; try { - storeFile = new StoreFile(this.getFileSystem(), path, this.conf, - this.cacheConf, this.family.getBloomFilterType(), - NoOpDataBlockEncoder.INSTANCE); - storeFile.createReader(); + createStoreFileAndReader(path, NoOpDataBlockEncoder.INSTANCE); } catch (IOException e) { LOG.error("Failed to open store file : " + path + ", keeping it in tmp location", e); @@ -1210,7 +1308,7 @@ public class HStore implements Store { * @return StoreFile created. May be null. * @throws IOException */ - private void completeCompaction(final Collection compactedFiles, + protected void completeCompaction(final Collection compactedFiles, final Collection result) throws IOException { try { this.lock.writeLock().lock(); @@ -1235,6 +1333,9 @@ public class HStore implements Store { // let the archive util decide if we should archive or delete the files LOG.debug("Removing store files after compaction..."); + for (StoreFile compactedFile : compactedFiles) { + compactedFile.closeReader(true); + } this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles); } catch (IOException e) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 18b7c1a..f053398 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; +import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -103,7 +104,7 @@ public interface Store extends HeapSize, StoreConfigInformation { * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic * across all of them. * @param cells - * @param readpoint readpoint below which we can safely remove duplicate KVs + * @param readpoint readpoint below which we can safely remove duplicate KVs * @return memstore size delta * @throws IOException */ @@ -190,6 +191,15 @@ public interface Store extends HeapSize, StoreConfigInformation { public StoreFlushContext createFlushContext(long cacheFlushId); + /** + * Call to complete a compaction. Its for the case where we find in the WAL a compaction + * that was not finished. We could find one recovering a WAL after a regionserver crash. + * See HBASE-2331. + * @param compaction + */ + public void completeCompactionMarker(CompactionDescriptor compaction) + throws IOException; + // Split oriented methods public boolean canSplit(); @@ -211,7 +221,7 @@ public interface Store extends HeapSize, StoreConfigInformation { /** * This method should only be called from HRegion. It is assumed that the ranges of values in the * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this) - * + * * @param srcPathStr * @param sequenceId sequence Id associated with the HFile */ diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index a375c8b..ed36122 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -27,7 +27,6 @@ import java.net.URLEncoder; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.TreeMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -107,7 +106,7 @@ import org.apache.hadoop.util.StringUtils; @InterfaceAudience.Private class FSHLog implements HLog, Syncable { static final Log LOG = LogFactory.getLog(FSHLog.class); - + private final FileSystem fs; private final Path rootDir; private final Path dir; @@ -123,12 +122,11 @@ class FSHLog implements HLog, Syncable { private long lastDeferredTxid; private final Path oldLogDir; private volatile boolean logRollRunning; - private boolean failIfLogDirExists; private WALCoprocessorHost coprocessorHost; private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current SequenceFile.writer - // Minimum tolerable replicas, if the actual value is lower than it, + // Minimum tolerable replicas, if the actual value is lower than it, // rollWriter will be triggered private int minTolerableReplication; private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas @@ -241,10 +239,10 @@ class FSHLog implements HLog, Syncable { public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf) throws IOException { - this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, + this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, false); } - + /** * Constructor. * @@ -258,7 +256,7 @@ class FSHLog implements HLog, Syncable { public FSHLog(final FileSystem fs, final Path root, final String logDir, final String oldLogDir, final Configuration conf) throws IOException { - this(fs, root, logDir, oldLogDir, + this(fs, root, logDir, oldLogDir, conf, null, true, null, false); } @@ -284,7 +282,7 @@ class FSHLog implements HLog, Syncable { public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf, final List listeners, final String prefix) throws IOException { - this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, + this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, prefix, false); } @@ -311,7 +309,7 @@ class FSHLog implements HLog, Syncable { * @throws IOException */ public FSHLog(final FileSystem fs, final Path root, final String logDir, - final String oldLogDir, final Configuration conf, + final String oldLogDir, final Configuration conf, final List listeners, final boolean failIfLogDirExists, final String prefix, boolean forMeta) throws IOException { @@ -322,15 +320,13 @@ class FSHLog implements HLog, Syncable { this.oldLogDir = new Path(this.rootDir, oldLogDir); this.forMeta = forMeta; this.conf = conf; - + if (listeners != null) { for (WALActionsListener i: listeners) { registerWALActionsListener(i); } } - - this.failIfLogDirExists = failIfLogDirExists; - + this.blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize", getDefaultBlockSize()); // Roll at 95% of block size. @@ -338,7 +334,7 @@ class FSHLog implements HLog, Syncable { this.logrollsize = (long)(this.blocksize * multi); this.optionalFlushInterval = conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000); - + this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32); this.minTolerableReplication = conf.getInt( "hbase.regionserver.hlog.tolerable.lowreplication", @@ -348,9 +344,9 @@ class FSHLog implements HLog, Syncable { this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true); this.closeErrorsTolerated = conf.getInt( "hbase.regionserver.logroll.errors.tolerated", 0); - + this.logSyncer = new LogSyncer(this.optionalFlushInterval); - + LOG.info("HLog configuration: blocksize=" + StringUtils.byteDesc(this.blocksize) + ", rollsize=" + StringUtils.byteDesc(this.logrollsize) + @@ -375,7 +371,7 @@ class FSHLog implements HLog, Syncable { } // rollWriter sets this.hdfs_out if it can. rollWriter(); - + // handle the reflection necessary to call getNumCurrentReplicas() this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out); @@ -392,7 +388,7 @@ class FSHLog implements HLog, Syncable { this.metrics = new MetricsWAL(); } - + // use reflection to search for getDefaultBlockSize(Path f) // if the method doesn't exist, fall back to using getDefaultBlockSize() private long getDefaultBlockSize() throws IOException { @@ -485,7 +481,7 @@ class FSHLog implements HLog, Syncable { * @return The wrapped stream our writer is using; its not the * writer's 'out' FSDatoOutputStream but the stream that this 'out' wraps * (In hdfs its an instance of DFSDataOutputStream). - * + * * usage: see TestLogRolling.java */ OutputStream getOutputStream() { @@ -576,7 +572,7 @@ class FSHLog implements HLog, Syncable { /** * This method allows subclasses to inject different writers without having to * extend other methods like rollWriter(). - * + * * @param fs * @param path * @param conf @@ -773,28 +769,30 @@ class FSHLog implements HLog, Syncable { close(); if (!fs.exists(this.dir)) return; FileStatus[] files = fs.listStatus(this.dir); - for(FileStatus file : files) { + if (files != null) { + for(FileStatus file : files) { - Path p = getHLogArchivePath(this.oldLogDir, file.getPath()); - // Tell our listeners that a log is going to be archived. - if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.preLogArchive(file.getPath(), p); + Path p = getHLogArchivePath(this.oldLogDir, file.getPath()); + // Tell our listeners that a log is going to be archived. + if (!this.listeners.isEmpty()) { + for (WALActionsListener i : this.listeners) { + i.preLogArchive(file.getPath(), p); + } } - } - if (!fs.rename(file.getPath(),p)) { - throw new IOException("Unable to rename " + file.getPath() + " to " + p); - } - // Tell our listeners that a log was archived. - if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.postLogArchive(file.getPath(), p); + if (!fs.rename(file.getPath(),p)) { + throw new IOException("Unable to rename " + file.getPath() + " to " + p); + } + // Tell our listeners that a log was archived. + if (!this.listeners.isEmpty()) { + for (WALActionsListener i : this.listeners) { + i.postLogArchive(file.getPath(), p); + } } } + LOG.debug("Moved " + files.length + " log files to " + + FSUtils.getPath(this.oldLogDir)); } - LOG.debug("Moved " + files.length + " log files to " + - FSUtils.getPath(this.oldLogDir)); if (!fs.delete(dir, true)) { LOG.info("Unable to delete " + dir); } @@ -844,14 +842,15 @@ class FSHLog implements HLog, Syncable { /** * @param now - * @param regionName + * @param encodedRegionName Encoded name of the region as returned by + * HRegionInfo#getEncodedNameAsBytes(). * @param tableName * @param clusterId * @return New log key. */ - protected HLogKey makeKey(byte[] regionName, byte[] tableName, long seqnum, + protected HLogKey makeKey(byte[] encodedRegionName, byte[] tableName, long seqnum, long now, UUID clusterId) { - return new HLogKey(regionName, tableName, seqnum, now, clusterId); + return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterId); } @Override @@ -953,7 +952,7 @@ class FSHLog implements HLog, Syncable { } // Sync if catalog region, and if not then check if that table supports // deferred log flushing - if (doSync && + if (doSync && (info.isMetaRegion() || !htd.isDeferredLogFlush())) { // sync txn to file system @@ -963,14 +962,14 @@ class FSHLog implements HLog, Syncable { } @Override - public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits, + public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, final long now, HTableDescriptor htd) throws IOException { return append(info, tableName, edits, clusterId, now, htd, false); } @Override - public long append(HRegionInfo info, byte [] tableName, WALEdit edits, + public long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, final long now, HTableDescriptor htd) throws IOException { return append(info, tableName, edits, clusterId, now, htd, true); @@ -992,8 +991,8 @@ class FSHLog implements HLog, Syncable { // List of pending writes to the HLog. There corresponds to transactions // that have not yet returned to the client. We keep them cached here - // instead of writing them to HDFS piecemeal, because the HDFS write - // method is pretty heavyweight as far as locking is concerned. The + // instead of writing them to HDFS piecemeal, because the HDFS write + // method is pretty heavyweight as far as locking is concerned. The // goal is to increase the batchsize for writing-to-hdfs as well as // sync-to-hdfs, so that we can get better system throughput. private List pendingWrites = new LinkedList(); @@ -1088,7 +1087,7 @@ class FSHLog implements HLog, Syncable { try { long doneUpto; long now = EnvironmentEdgeManager.currentTimeMillis(); - // First flush all the pending writes to HDFS. Then + // First flush all the pending writes to HDFS. Then // issue the sync to HDFS. If sync is successful, then update // syncedTillHere to indicate that transactions till this // number has been successfully synced. @@ -1114,7 +1113,7 @@ class FSHLog implements HLog, Syncable { tempWriter = this.writer; logSyncer.hlogFlush(tempWriter, pending); } - } + } } // another thread might have sync'ed avoid double-sync'ing if (txid <= this.syncedTillHere) { @@ -1251,6 +1250,7 @@ class FSHLog implements HLog, Syncable { } } + // TODO: Remove info. Unused. protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit, HTableDescriptor htd) throws IOException { @@ -1363,13 +1363,13 @@ class FSHLog implements HLog, Syncable { /** * Get the directory we are making logs in. - * + * * @return dir */ protected Path getDir() { return dir; } - + static Path getHLogArchivePath(Path oldLogDir, Path p) { return new Path(oldLogDir, p.getName()); } @@ -1407,7 +1407,7 @@ class FSHLog implements HLog, Syncable { conf, baseDir, p, oldLogDir, fs); logSplitter.splitLog(); } - + @Override public WALCoprocessorHost getCoprocessorHost() { return coprocessorHost; @@ -1456,4 +1456,4 @@ class FSHLog implements HLog, Syncable { System.exit(-1); } } -} +} \ No newline at end of file diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 9442180..bba8b32 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -27,24 +27,20 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.exceptions.FailedLogCloseException; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.FailedLogCloseException; +import org.apache.hadoop.io.Writable; @InterfaceAudience.Private public interface HLog { public static final Log LOG = LogFactory.getLog(HLog.class); - public static final byte[] METAFAMILY = Bytes.toBytes("METAFAMILY"); - static final byte[] METAROW = Bytes.toBytes("METAROW"); - /** File Extension used while splitting an HLog into regions (HBASE-2312) */ public static final String SPLITTING_EXT = "-splitting"; public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false; @@ -96,7 +92,7 @@ public interface HLog { /** * Constructor for both params - * + * * @param edit * log's edit * @param key @@ -110,7 +106,7 @@ public interface HLog { /** * Gets the edit - * + * * @return edit */ public WALEdit getEdit() { @@ -119,7 +115,7 @@ public interface HLog { /** * Gets the key - * + * * @return key */ public HLogKey getKey() { @@ -128,7 +124,7 @@ public interface HLog { /** * Set compression context for this entry. - * + * * @param compressionContext * Compression context */ @@ -157,14 +153,14 @@ public interface HLog { /** * registers WALActionsListener - * + * * @param listener */ public void registerWALActionsListener(final WALActionsListener listener); /** * unregisters WALActionsListener - * + * * @param listener */ public boolean unregisterWALActionsListener(final WALActionsListener listener); @@ -178,7 +174,7 @@ public interface HLog { * Called by HRegionServer when it opens a new region to ensure that log * sequence numbers are always greater than the latest sequence number of the * region being brought on-line. - * + * * @param newvalue * We'll set log edit/sequence number to this value if it is greater * than the current value. @@ -192,7 +188,7 @@ public interface HLog { /** * Roll the log writer. That is, start writing log messages to a new file. - * + * *

* The implementation is synchronized in order to make sure there's one rollWriter * running at any given time. @@ -207,11 +203,11 @@ public interface HLog { /** * Roll the log writer. That is, start writing log messages to a new file. - * + * *

* The implementation is synchronized in order to make sure there's one rollWriter * running at any given time. - * + * * @param force * If true, force creation of a new writer even if no entries have * been written to the current writer @@ -226,21 +222,21 @@ public interface HLog { /** * Shut down the log. - * + * * @throws IOException */ public void close() throws IOException; /** * Shut down the log and delete the log directory - * + * * @throws IOException */ public void closeAndDelete() throws IOException; /** * Append an entry to the log. - * + * * @param regionInfo * @param logEdit * @param logKey @@ -254,7 +250,7 @@ public interface HLog { /** * Only used in tests. - * + * * @param info * @param tableName * @param edits @@ -269,7 +265,7 @@ public interface HLog { * Append a set of edits to the log. Log edits are keyed by (encoded) * regionName, rowname, and log-sequence-id. The HLog is not flushed after * this transaction is written to the log. - * + * * @param info * @param tableName * @param edits @@ -286,7 +282,7 @@ public interface HLog { * Append a set of edits to the log. Log edits are keyed by (encoded) * regionName, rowname, and log-sequence-id. The HLog is flushed after this * transaction is written to the log. - * + * * @param info * @param tableName * @param edits @@ -351,7 +347,7 @@ public interface HLog { /** * Get LowReplication-Roller status - * + * * @return lowReplicationRollEnabled */ public boolean isLowReplicationRollEnabled(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java index b2cd2f6..45ac265 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java @@ -28,27 +28,21 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.util.Bytes; - public class HLogUtil { static final Log LOG = LogFactory.getLog(HLogUtil.class); - /** - * @param family - * @return true if the column is a meta column - */ - public static boolean isMetaFamily(byte[] family) { - return Bytes.equals(HLog.METAFAMILY, family); - } - @SuppressWarnings("unchecked") public static Class getKeyClass(Configuration conf) { return (Class) conf.getClass( @@ -69,7 +63,7 @@ public class HLogUtil { /** * Pattern used to validate a HLog file name */ - private static final Pattern pattern = + private static final Pattern pattern = Pattern.compile(".*\\.\\d*("+HLog.META_HLOG_FILE_EXTN+")*"); /** @@ -84,40 +78,40 @@ public class HLogUtil { /* * Get a reader for the WAL. - * + * * @param fs - * + * * @param path - * + * * @param conf - * + * * @return A WAL reader. Close when done with it. - * + * * @throws IOException - * + * * public static HLog.Reader getReader(final FileSystem fs, final Path path, * Configuration conf) throws IOException { try { - * + * * if (logReaderClass == null) { - * + * * logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", * SequenceFileLogReader.class, Reader.class); } - * - * + * + * * HLog.Reader reader = logReaderClass.newInstance(); reader.init(fs, path, * conf); return reader; } catch (IOException e) { throw e; } catch (Exception * e) { throw new IOException("Cannot get log reader", e); } } - * + * * * Get a writer for the WAL. - * + * * @param path - * + * * @param conf - * + * * @return A WAL writer. Close when done with it. - * + * * @throws IOException - * + * * public static HLog.Writer createWriter(final FileSystem fs, final Path * path, Configuration conf) throws IOException { try { if (logWriterClass == * null) { logWriterClass = @@ -130,7 +124,7 @@ public class HLogUtil { /** * Construct the HLog directory name - * + * * @param serverName * Server name formatted as described in {@link ServerName} * @return the relative HLog directory name, e.g. @@ -157,7 +151,7 @@ public class HLogUtil { /** * Move aside a bad edits file. - * + * * @param fs * @param edits * Edits file to move aside. @@ -239,7 +233,7 @@ public class HLogUtil { /** * Returns sorted set of edit files made by wal-log splitter, excluding files * with '.temp' suffix. - * + * * @param fs * @param regiondir * @return Files in passed regiondir as a sorted set. @@ -287,4 +281,18 @@ public class HLogUtil { } return false; } + + /** + * Write the marker that a compaction has succeeded and is about to be committed. + * This provides info to the HMaster to allow it to recover the compaction if + * this regionserver dies in the middle (This part is not yet implemented). It also prevents the compaction from + * finishing if this regionserver has already lost its lease on the log. + */ + public static void writeCompactionMarker(HLog log, HTableDescriptor htd, HRegionInfo info, final CompactionDescriptor c) + throws IOException { + WALEdit e = WALEdit.createCompaction(c); + log.append(info, c.getTableName().toByteArray(), e, + EnvironmentEdgeManager.currentTimeMillis(), htd); + LOG.info("Appended compaction marker " + c); + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index 81f8fba..f9b72f2 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -27,8 +27,9 @@ import java.util.NavigableMap; import java.util.TreeMap; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.io.Writable; @@ -69,6 +70,11 @@ import org.apache.hadoop.io.Writable; */ @InterfaceAudience.Private public class WALEdit implements Writable, HeapSize { + // TODO: Make it so user cannot make a cf w/ this name. Make the illegal cf names. Ditto for row. + public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY"); + static final byte [] METAROW = Bytes.toBytes("METAROW"); + static final byte[] COMPLETE_CACHE_FLUSH = Bytes.toBytes("HBASE::CACHEFLUSH"); + static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION"); private final int VERSION_2 = -1; @@ -80,12 +86,21 @@ public class WALEdit implements Writable, HeapSize { public WALEdit() { } + /** + * @param f + * @return True is f is {@link #METAFAMILY} + */ + public static boolean isMetaEditFamily(final byte [] f) { + return Bytes.equals(METAFAMILY, f); + } + public void setCompressionContext(final CompressionContext compressionContext) { this.compressionContext = compressionContext; } - public void add(KeyValue kv) { + public WALEdit add(KeyValue kv) { this.kvs.add(kv); + return this; } public boolean isEmpty() { @@ -197,4 +212,26 @@ public class WALEdit implements Writable, HeapSize { return sb.toString(); } -} + /** + * Create a compacion WALEdit + * @param c + * @return A WALEdit that has c serialized as its value + */ + public static WALEdit createCompaction(final CompactionDescriptor c) { + byte [] pbbytes = c.toByteArray(); + KeyValue kv = new KeyValue(METAROW, METAFAMILY, COMPACTION, System.currentTimeMillis(), pbbytes); + return new WALEdit().add(kv); //replication scope null so that this won't be replicated + } + + /** + * Deserialized and returns a CompactionDescriptor is the KeyValue contains one. + * @param kv the key value + * @return deserialized CompactionDescriptor or null. + */ + public static CompactionDescriptor getCompaction(KeyValue kv) throws IOException { + if (kv.matchingRow(METAROW) && kv.matchingColumn(METAFAMILY, COMPACTION)) { + return CompactionDescriptor.parseFrom(kv.getValue()); + } + return null; + } +} \ No newline at end of file diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index accf181..809e772 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -57,6 +57,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Waiter.Predicate; import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; @@ -65,7 +66,6 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.exceptions.MasterNotRunningException; import org.apache.hadoop.hbase.exceptions.TableExistsException; import org.apache.hadoop.hbase.exceptions.TableNotEnabledException; @@ -86,7 +86,6 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl; -import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -1322,6 +1321,15 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { return rowCount; } + public void loadNumericRows(final HTable t, final byte[] f, int startRow, int endRow) throws IOException { + for (int i = startRow; i < endRow; i++) { + byte[] data = Bytes.toBytes(String.valueOf(i)); + Put put = new Put(data); + put.add(f, null, data); + t.put(put); + } + } + /** * Return the number of rows in the given table. */ @@ -1937,7 +1945,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { /* * Retrieves a splittable region randomly from tableName - * + * * @param tableName name of table * @param maxAttempts maximum number of attempts, unlimited for value of -1 * @return the HRegion chosen, null if none was found within limit of maxAttempts @@ -1956,7 +1964,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } regCount = regions.size(); // There are chances that before we get the region for the table from an RS the region may - // be going for CLOSE. This may be because online schema change is enabled + // be going for CLOSE. This may be because online schema change is enabled if (regCount > 0) { idx = random.nextInt(regCount); // if we have just tried this region, there is no need to try again @@ -1974,7 +1982,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } while (maxAttempts == -1 || attempts < maxAttempts); return null; } - + public MiniZooKeeperCluster getZkCluster() { return zkCluster; } @@ -2252,10 +2260,10 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { scanner.close(); return result; } - + /** * Create region split keys between startkey and endKey - * + * * @param startKey * @param endKey * @param numRegions the number of regions to be created. it has to be greater than 3. diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index 4a5231e..64f8e60 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -676,6 +676,20 @@ public class MiniHBaseCluster extends HBaseCluster { this.hbaseCluster.join(); } + public List findRegionsForTable(byte[] tableName) { + ArrayList ret = new ArrayList(); + for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { + HRegionServer hrs = rst.getRegionServer(); + for (HRegion region : hrs.getOnlineRegions(tableName)) { + if (Bytes.equals(region.getTableDesc().getName(), tableName)) { + ret.add(region); + } + } + } + return ret; + } + + protected int getRegionServerIndex(ServerName serverName) { //we have a small number of region servers, this should be fine for now. List servers = getRegionServerThreads(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java index e68bae7..0100959 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java @@ -23,9 +23,6 @@ import static org.junit.Assert.assertEquals; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.AfterClass; @@ -99,13 +96,7 @@ public class TestFullLogReconstruction { // Load up the table with simple rows and count them int initialCount = TEST_UTIL.loadTable(table, FAMILY); - Scan scan = new Scan(); - ResultScanner results = table.getScanner(scan); - int count = 0; - for (Result res : results) { - count++; - } - results.close(); + int count = TEST_UTIL.countRows(table); assertEquals(initialCount, count); @@ -114,15 +105,8 @@ public class TestFullLogReconstruction { } TEST_UTIL.expireRegionServerSession(0); - scan = new Scan(); - results = table.getScanner(scan); - int newCount = 0; - for (Result res : results) { - newCount++; - } + int newCount = TEST_UTIL.countRows(table); assertEquals(count, newCount); - results.close(); table.close(); } - -} +} \ No newline at end of file diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java new file mode 100644 index 0000000..d01448a --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.LeaseManager; +import org.apache.log4j.Level; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test for the case where a regionserver going down has enough cycles to do damage to regions + * that have actually been assigned elsehwere. + * + *

If we happen to assign a region before it fully done with in its old location -- i.e. it is on two servers at the + * same time -- all can work fine until the case where the region on the dying server decides to compact or otherwise + * change the region file set. The region in its new location will then get a surprise when it tries to do something + * w/ a file removed by the region in its old location on dying server. + * + *

Making a test for this case is a little tough in that even if a file is deleted up on the namenode, + * if the file was opened before the delete, it will continue to let reads happen until something changes the + * state of cached blocks in the dfsclient that was already open (a block from the deleted file is cleaned + * from the datanode by NN). + * + *

What we will do below is do an explicit check for existence on the files listed in the region that + * has had some files removed because of a compaction. This sort of hurry's along and makes certain what is a chance + * occurance. + */ +@Category(MediumTests.class) +public class TestIOFencing { + static final Log LOG = LogFactory.getLog(TestIOFencing.class); + static { + ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem")).getLogger().setLevel(Level.ALL); + ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL); + } + + public abstract static class CompactionBlockerRegion extends HRegion { + volatile int compactCount = 0; + volatile CountDownLatch compactionsBlocked = new CountDownLatch(0); + volatile CountDownLatch compactionsWaiting = new CountDownLatch(0); + + public CompactionBlockerRegion(Path tableDir, HLog log, + FileSystem fs, Configuration confParam, HRegionInfo info, + HTableDescriptor htd, RegionServerServices rsServices) { + super(tableDir, log, fs, confParam, info, htd, rsServices); + } + + public void stopCompactions() { + compactionsBlocked = new CountDownLatch(1); + compactionsWaiting = new CountDownLatch(1); + } + + public void allowCompactions() { + LOG.debug("allowing compactions"); + compactionsBlocked.countDown(); + } + public void waitForCompactionToBlock() throws IOException { + try { + LOG.debug("waiting for compaction to block"); + compactionsWaiting.await(); + LOG.debug("compaction block reached"); + } catch (InterruptedException ex) { + throw new IOException(ex); + } + } + @Override + public boolean compact(CompactionContext compaction, Store store) throws IOException { + try { + return super.compact(compaction, store); + } finally { + compactCount++; + } + } + public int countStoreFiles() { + int count = 0; + for (Store store : stores.values()) { + count += store.getStorefilesCount(); + } + return count; + } + } + + /** + * An override of HRegion that allows us park compactions in a holding pattern and + * then when appropriate for the test, allow them proceed again. + */ + public static class BlockCompactionsInPrepRegion extends CompactionBlockerRegion { + + public BlockCompactionsInPrepRegion(Path tableDir, HLog log, + FileSystem fs, Configuration confParam, HRegionInfo info, + HTableDescriptor htd, RegionServerServices rsServices) { + super(tableDir, log, fs, confParam, info, htd, rsServices); + } + @Override + protected void doRegionCompactionPrep() throws IOException { + compactionsWaiting.countDown(); + try { + compactionsBlocked.await(); + } catch (InterruptedException ex) { + throw new IOException(); + } + super.doRegionCompactionPrep(); + } + } + + /** + * An override of HRegion that allows us park compactions in a holding pattern and + * then when appropriate for the test, allow them proceed again. This allows the compaction + * entry to go the WAL before blocking, but blocks afterwards + */ + public static class BlockCompactionsInCompletionRegion extends CompactionBlockerRegion { + public BlockCompactionsInCompletionRegion(Path tableDir, HLog log, + FileSystem fs, Configuration confParam, HRegionInfo info, + HTableDescriptor htd, RegionServerServices rsServices) { + super(tableDir, log, fs, confParam, info, htd, rsServices); + } + protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException { + return new BlockCompactionsInCompletionHStore(this, family, this.conf); + } + } + + public static class BlockCompactionsInCompletionHStore extends HStore { + CompactionBlockerRegion r; + protected BlockCompactionsInCompletionHStore(HRegion region, HColumnDescriptor family, + Configuration confParam) throws IOException { + super(region, family, confParam); + r = (CompactionBlockerRegion) region; + } + + @Override + protected void completeCompaction(Collection compactedFiles, + Collection result) throws IOException { + try { + r.compactionsWaiting.countDown(); + r.compactionsBlocked.await(); + } catch (InterruptedException ex) { + throw new IOException(ex); + } + super.completeCompaction(compactedFiles, result); + } + } + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static byte[] TABLE_NAME = Bytes.toBytes("tabletest"); + private final static byte[] FAMILY = Bytes.toBytes("family"); + private static final int FIRST_BATCH_COUNT = 4000; + private static final int SECOND_BATCH_COUNT = FIRST_BATCH_COUNT; + + /** + * Test that puts up a regionserver, starts a compaction on a loaded region but holds the + * compaction until after we have killed the server and the region has come up on + * a new regionserver altogether. This fakes the double assignment case where region in one + * location changes the files out from underneath a region being served elsewhere. + */ + @Test + public void testFencingAroundCompaction() throws Exception { + doTest(BlockCompactionsInPrepRegion.class); + } + + /** + * Test that puts up a regionserver, starts a compaction on a loaded region but holds the + * compaction completion until after we have killed the server and the region has come up on + * a new regionserver altogether. This fakes the double assignment case where region in one + * location changes the files out from underneath a region being served elsewhere. + */ + @Test + public void testFencingAroundCompactionAfterWALSync() throws Exception { + doTest(BlockCompactionsInCompletionRegion.class); + } + + public void doTest(Class regionClass) throws Exception { + Configuration c = TEST_UTIL.getConfiguration(); + // Insert our custom region + c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class); + c.setBoolean("dfs.support.append", true); + // Encourage plenty of flushes + c.setLong("hbase.hregion.memstore.flush.size", 200000); + c.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, ConstantSizeRegionSplitPolicy.class.getName()); + // Only run compaction when we tell it to + c.setInt("hbase.hstore.compactionThreshold", 1000); + c.setLong("hbase.hstore.blockingStoreFiles", 1000); + // Compact quickly after we tell it to! + c.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000); + LOG.info("Starting mini cluster"); + TEST_UTIL.startMiniCluster(1); + CompactionBlockerRegion compactingRegion = null; + HBaseAdmin admin = null; + try { + LOG.info("Creating admin"); + admin = new HBaseAdmin(c); + LOG.info("Creating table"); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + HTable table = new HTable(c, TABLE_NAME); + LOG.info("Loading test table"); + // Load some rows + TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT); + // Find the region + List testRegions = TEST_UTIL.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME); + assertEquals(1, testRegions.size()); + compactingRegion = (CompactionBlockerRegion)testRegions.get(0); + assertTrue(compactingRegion.countStoreFiles() > 1); + final byte REGION_NAME[] = compactingRegion.getRegionName(); + LOG.info("Blocking compactions"); + compactingRegion.stopCompactions(); + LOG.info("Asking for compaction"); + admin.majorCompact(TABLE_NAME); + LOG.info("Waiting for compaction to be about to start"); + compactingRegion.waitForCompactionToBlock(); + LOG.info("Starting a new server"); + RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer(); + HRegionServer newServer = newServerThread.getRegionServer(); + LOG.info("Killing region server ZK lease"); + TEST_UTIL.expireRegionServerSession(0); + CompactionBlockerRegion newRegion = null; + long startWaitTime = System.currentTimeMillis(); + while (newRegion == null) { + LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME)); + Thread.sleep(100); + newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME); + assertTrue("Timed out waiting for new server to open region", + System.currentTimeMillis() - startWaitTime < 60000); + } + LOG.info("Allowing compaction to proceed"); + compactingRegion.allowCompactions(); + while (compactingRegion.compactCount == 0) { + Thread.sleep(1000); + } + // The server we killed stays up until the compaction that was started before it was killed completes. In logs + // you should see the old regionserver now going down. + LOG.info("Compaction finished"); + // After compaction of old region finishes on the server that was going down, make sure that + // all the files we expect are still working when region is up in new location. + FileSystem fs = newRegion.getFilesystem(); + for (String f: newRegion.getStoreFileList(new byte [][] {FAMILY})) { + assertTrue("After compaction, does not exist: " + f, fs.exists(new Path(f))); + } + // If we survive the split keep going... + // Now we make sure that the region isn't totally confused. Load up more rows. + TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT, FIRST_BATCH_COUNT + SECOND_BATCH_COUNT); + admin.majorCompact(TABLE_NAME); + startWaitTime = System.currentTimeMillis(); + while (newRegion.compactCount == 0) { + Thread.sleep(1000); + assertTrue("New region never compacted", System.currentTimeMillis() - startWaitTime < 30000); + } + assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, TEST_UTIL.countRows(table)); + } finally { + if (compactingRegion != null) { + compactingRegion.allowCompactions(); + } + admin.close(); + TEST_UTIL.shutdownMiniCluster(); + } + } +} \ No newline at end of file diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java index 41dd6b7..2dd1189 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java @@ -350,7 +350,7 @@ public class TestRowProcessorEndpoint { // We can also inject some meta data to the walEdit KeyValue metaKv = new KeyValue( - row, HLog.METAFAMILY, + row, WALEdit.METAFAMILY, Bytes.toBytes("I just increment counter"), Bytes.toBytes(counter)); walEdit.add(metaKv); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 8c2fe1d..a40b71f 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -35,18 +35,18 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; -import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Increment; @@ -65,7 +66,7 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.exceptions.NotServingRegionException; import org.apache.hadoop.hbase.exceptions.WrongRegionException; @@ -82,6 +83,8 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; @@ -382,6 +385,95 @@ public class TestHRegion extends HBaseTestCase { } } + @Test + public void testRecoveredEditsReplayCompaction() throws Exception { + String method = "testRecoveredEditsReplayCompaction"; + byte[] tableName = Bytes.toBytes(method); + byte[] family = Bytes.toBytes("family"); + this.region = initHRegion(tableName, method, conf, family); + try { + Path regiondir = region.getRegionFileSystem().getRegionDir(); + FileSystem fs = region.getRegionFileSystem().getFileSystem(); + byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); + + long maxSeqId = 3; + long minSeqId = 0; + + for (long i = minSeqId; i < maxSeqId; i++) { + Put put = new Put(Bytes.toBytes(i)); + put.add(family, Bytes.toBytes(i), Bytes.toBytes(i)); + region.put(put); + region.flushcache(); + } + + //this will create a region with 3 files + assertEquals(3, region.getStore(family).getStorefilesCount()); + List storeFiles = new ArrayList(3); + for (StoreFile sf : region.getStore(family).getStorefiles()) { + storeFiles.add(sf.getPath()); + } + + //disable compaction completion + conf.setBoolean("hbase.hstore.compaction.complete",false); + region.compactStores(); + + //ensure that nothing changed + assertEquals(3, region.getStore(family).getStorefilesCount()); + + //now find the compacted file, and manually add it to the recovered edits + Path tmpDir = region.getRegionFileSystem().getTempDir(); + FileStatus[] files = region.getRegionFileSystem().getFileSystem().listStatus(tmpDir); + assertEquals(1, files.length); + //move the file inside region dir + Path newFile = region.getRegionFileSystem().commitStoreFile(Bytes.toString(family), files[0].getPath()); + + CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor( + this.region.getRegionInfo(), family, + storeFiles, Lists.newArrayList(newFile), + region.getRegionFileSystem().getStoreDir(Bytes.toString(family))); + + HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(), + this.region.getRegionInfo(), compactionDescriptor); + + Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir); + + Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000)); + fs.create(recoveredEdits); + HLog.Writer writer = HLogFactory.createWriter(fs, recoveredEdits, conf); + + long time = System.nanoTime(); + + writer.append(new HLog.Entry(new HLogKey(regionName, tableName, 10, time, HConstants.DEFAULT_CLUSTER_ID), + WALEdit.createCompaction(compactionDescriptor))); + writer.close(); + + //close the region now, and reopen again + HTableDescriptor htd = region.getTableDesc(); + HRegionInfo info = region.getRegionInfo(); + region.close(); + region = HRegion.openHRegion(conf, fs, regiondir.getParent().getParent(), info, htd, null); + + //now check whether we have only one store file, the compacted one + Collection sfs = region.getStore(family).getStorefiles(); + for (StoreFile sf : sfs) { + LOG.info(sf.getPath()); + } + assertEquals(1, region.getStore(family).getStorefilesCount()); + files = region.getRegionFileSystem().getFileSystem().listStatus(tmpDir); + assertEquals(0, files.length); + + for (long i = minSeqId; i < maxSeqId; i++) { + Get get = new Get(Bytes.toBytes(i)); + Result result = region.get(get); + byte[] value = result.getValue(family, Bytes.toBytes(i)); + assertEquals(Bytes.toBytes(i), value); + } + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + public void testGetWhileRegionClose() throws IOException { Configuration hc = initSplit(); int numRows = 100;