diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MapReduceProtos.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MapReduceProtos.java index 5879e96..ab8540c 100644 --- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MapReduceProtos.java +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MapReduceProtos.java @@ -589,11 +589,485 @@ public final class MapReduceProtos { // @@protoc_insertion_point(class_scope:ScanMetrics) } + public interface SnapshotRegionSplitOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // optional .RegionSpecifier regionSpecifier = 1; + boolean hasRegionSpecifier(); + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier getRegionSpecifier(); + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder getRegionSpecifierOrBuilder(); + } + public static final class SnapshotRegionSplit extends + com.google.protobuf.GeneratedMessage + implements SnapshotRegionSplitOrBuilder { + // Use SnapshotRegionSplit.newBuilder() to construct. + private SnapshotRegionSplit(Builder builder) { + super(builder); + } + private SnapshotRegionSplit(boolean noInit) {} + + private static final SnapshotRegionSplit defaultInstance; + public static SnapshotRegionSplit getDefaultInstance() { + return defaultInstance; + } + + public SnapshotRegionSplit getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.internal_static_SnapshotRegionSplit_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.internal_static_SnapshotRegionSplit_fieldAccessorTable; + } + + private int bitField0_; + // optional .RegionSpecifier regionSpecifier = 1; + public static final int REGIONSPECIFIER_FIELD_NUMBER = 1; + private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier regionSpecifier_; + public boolean hasRegionSpecifier() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier getRegionSpecifier() { + return regionSpecifier_; + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder getRegionSpecifierOrBuilder() { + return regionSpecifier_; + } + + private void initFields() { + regionSpecifier_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (hasRegionSpecifier()) { + if (!getRegionSpecifier().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeMessage(1, regionSpecifier_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(1, regionSpecifier_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit other = (org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit) obj; + + boolean result = true; + result = result && (hasRegionSpecifier() == other.hasRegionSpecifier()); + if (hasRegionSpecifier()) { + result = result && getRegionSpecifier() + .equals(other.getRegionSpecifier()); + } + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + @java.lang.Override + public int hashCode() { + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasRegionSpecifier()) { + hash = (37 * hash) + REGIONSPECIFIER_FIELD_NUMBER; + hash = (53 * hash) + getRegionSpecifier().hashCode(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + return hash; + } + + public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplitOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.internal_static_SnapshotRegionSplit_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.internal_static_SnapshotRegionSplit_fieldAccessorTable; + } + + // Construct using org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder(BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getRegionSpecifierFieldBuilder(); + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + if (regionSpecifierBuilder_ == null) { + regionSpecifier_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); + } else { + regionSpecifierBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit.getDescriptor(); + } + + public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit getDefaultInstanceForType() { + return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit build() { + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit buildPartial() { + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit result = new org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + if (regionSpecifierBuilder_ == null) { + result.regionSpecifier_ = regionSpecifier_; + } else { + result.regionSpecifier_ = regionSpecifierBuilder_.build(); + } + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit) { + return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit other) { + if (other == org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit.getDefaultInstance()) return this; + if (other.hasRegionSpecifier()) { + mergeRegionSpecifier(other.getRegionSpecifier()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (hasRegionSpecifier()) { + if (!getRegionSpecifier().isInitialized()) { + + return false; + } + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + } + break; + } + case 10: { + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder subBuilder = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.newBuilder(); + if (hasRegionSpecifier()) { + subBuilder.mergeFrom(getRegionSpecifier()); + } + input.readMessage(subBuilder, extensionRegistry); + setRegionSpecifier(subBuilder.buildPartial()); + break; + } + } + } + } + + private int bitField0_; + + // optional .RegionSpecifier regionSpecifier = 1; + private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier regionSpecifier_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder> regionSpecifierBuilder_; + public boolean hasRegionSpecifier() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier getRegionSpecifier() { + if (regionSpecifierBuilder_ == null) { + return regionSpecifier_; + } else { + return regionSpecifierBuilder_.getMessage(); + } + } + public Builder setRegionSpecifier(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier value) { + if (regionSpecifierBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + regionSpecifier_ = value; + onChanged(); + } else { + regionSpecifierBuilder_.setMessage(value); + } + bitField0_ |= 0x00000001; + return this; + } + public Builder setRegionSpecifier( + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder builderForValue) { + if (regionSpecifierBuilder_ == null) { + regionSpecifier_ = builderForValue.build(); + onChanged(); + } else { + regionSpecifierBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000001; + return this; + } + public Builder mergeRegionSpecifier(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier value) { + if (regionSpecifierBuilder_ == null) { + if (((bitField0_ & 0x00000001) == 0x00000001) && + regionSpecifier_ != org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance()) { + regionSpecifier_ = + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.newBuilder(regionSpecifier_).mergeFrom(value).buildPartial(); + } else { + regionSpecifier_ = value; + } + onChanged(); + } else { + regionSpecifierBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000001; + return this; + } + public Builder clearRegionSpecifier() { + if (regionSpecifierBuilder_ == null) { + regionSpecifier_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); + onChanged(); + } else { + regionSpecifierBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder getRegionSpecifierBuilder() { + bitField0_ |= 0x00000001; + onChanged(); + return getRegionSpecifierFieldBuilder().getBuilder(); + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder getRegionSpecifierOrBuilder() { + if (regionSpecifierBuilder_ != null) { + return regionSpecifierBuilder_.getMessageOrBuilder(); + } else { + return regionSpecifier_; + } + } + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder> + getRegionSpecifierFieldBuilder() { + if (regionSpecifierBuilder_ == null) { + regionSpecifierBuilder_ = new com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder>( + regionSpecifier_, + getParentForChildren(), + isClean()); + regionSpecifier_ = null; + } + return regionSpecifierBuilder_; + } + + // @@protoc_insertion_point(builder_scope:SnapshotRegionSplit) + } + + static { + defaultInstance = new SnapshotRegionSplit(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:SnapshotRegionSplit) + } + private static com.google.protobuf.Descriptors.Descriptor internal_static_ScanMetrics_descriptor; private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_ScanMetrics_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_SnapshotRegionSplit_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_SnapshotRegionSplit_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -604,9 +1078,11 @@ public final class MapReduceProtos { static { java.lang.String[] descriptorData = { "\n\017MapReduce.proto\032\013hbase.proto\".\n\013ScanMe" + - "trics\022\037\n\007metrics\030\001 \003(\0132\016.NameInt64PairBB" + - "\n*org.apache.hadoop.hbase.protobuf.gener" + - "atedB\017MapReduceProtosH\001\240\001\001" + "trics\022\037\n\007metrics\030\001 \003(\0132\016.NameInt64Pair\"@" + + "\n\023SnapshotRegionSplit\022)\n\017regionSpecifier" + + "\030\001 \001(\0132\020.RegionSpecifierBB\n*org.apache.h" + + "adoop.hbase.protobuf.generatedB\017MapReduc" + + "eProtosH\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -621,6 +1097,14 @@ public final class MapReduceProtos { new java.lang.String[] { "Metrics", }, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.class, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder.class); + internal_static_SnapshotRegionSplit_descriptor = + getDescriptor().getMessageTypes().get(1); + internal_static_SnapshotRegionSplit_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_SnapshotRegionSplit_descriptor, + new java.lang.String[] { "RegionSpecifier", }, + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit.class, + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.SnapshotRegionSplit.Builder.class); return null; } }; diff --git hbase-protocol/src/main/protobuf/MapReduce.proto hbase-protocol/src/main/protobuf/MapReduce.proto index 0ac70cd..310a5f3 100644 --- hbase-protocol/src/main/protobuf/MapReduce.proto +++ hbase-protocol/src/main/protobuf/MapReduce.proto @@ -18,15 +18,17 @@ //This file includes protocol buffers used in MapReduce only. - option java_package = "org.apache.hadoop.hbase.protobuf.generated"; - option java_outer_classname = "MapReduceProtos"; - option java_generate_equals_and_hash = true; - option optimize_for = SPEED; +option java_package = "org.apache.hadoop.hbase.protobuf.generated"; +option java_outer_classname = "MapReduceProtos"; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; - import "hbase.proto"; +import "hbase.proto"; - message ScanMetrics { +message ScanMetrics { + repeated NameInt64Pair metrics = 1; +} - repeated NameInt64Pair metrics = 1; - - } \ No newline at end of file +message SnapshotRegionSplit { + optional RegionSpecifier regionSpecifier = 1; +} \ No newline at end of file diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index 7b10b1f..97c43c3 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -110,6 +110,32 @@ public class TableMapReduceUtil { job, true); } + /** + * Use this before submitting a TableMap job. It will appropriately set up + * the job. + * + * @param table The table name to read from. + * @param scan The scan instance with the columns, time range etc. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + * @throws IOException When setting up the details fails. + */ + public static void initTableMapperJob(String table, Scan scan, + Class mapper, + Class outputKeyClass, + Class outputValueClass, Job job, + boolean addDependencyJars, Class inputFormatClass) + throws IOException { + initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job, + addDependencyJars, true, inputFormatClass); + } + + /** * Use this before submitting a TableMap job. It will appropriately set up * the job. @@ -123,13 +149,14 @@ public class TableMapReduceUtil { * carrying all necessary HBase configuration. * @param addDependencyJars upload HBase jars and jars for any of the configured * job classes via the distributed cache (tmpjars). + * @param initCredentials whether to initialize hbase auth credentials for the job * @throws IOException When setting up the details fails. */ public static void initTableMapperJob(String table, Scan scan, Class mapper, Class outputKeyClass, Class outputValueClass, Job job, - boolean addDependencyJars, Class inputFormatClass) + boolean addDependencyJars, boolean initCredentials, Class inputFormatClass) throws IOException { job.setInputFormatClass(inputFormatClass); if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass); @@ -148,7 +175,9 @@ public class TableMapReduceUtil { if (addDependencyJars) { addDependencyJars(job); } - initCredentials(job); + if (initCredentials) { + initCredentials(job); + } } /** @@ -228,6 +257,33 @@ public class TableMapReduceUtil { } /** + * Sets up the job for reading from a table snapshot. It bypasses hbase servers + * and read directly from snapshot files. + * + * @param snapshotName The name of the snapshot (of a table) to read from. + * @param scan The scan instance with the columns, time range etc. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + * @throws IOException When setting up the details fails. + * @see TableSnapshotInputFormat + */ + public static void initTableSnapshotMapperJob(String snapshotName, Scan scan, + Class mapper, + Class outputKeyClass, + Class outputValueClass, Job job, + boolean addDependencyJars) + throws IOException { + TableSnapshotInputFormat.setInput(job, snapshotName); + initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, + outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class); + } + + /** * Use this before submitting a Multi TableMap job. It will appropriately set * up the job. * @@ -609,9 +665,9 @@ public class TableMapReduceUtil { * Find a jar that contains a class of the same name, if any. * It will return a jar file, even if that is not the first thing * on the class path that has a class with the same name. - * + * * This is shamelessly copied from JobConf - * + * * @param my_class the class to find. * @return a jar file that contains the class, or null. * @throws IOException diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java new file mode 100644 index 0000000..a6341ff --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.snapshot.ExportSnapshot; +import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil; +import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import com.google.common.annotations.VisibleForTesting; + +/** + * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job + * bypasses HBase servers, and directly accesses the underlying files (hfile, recovered edits, + * hlogs, etc) directly to provide maximum performance. The snapshot is not required to be + * restored or cloned. This also allows to run the mapreduce job from an online or offline + * hbase cluster. The snapshot files can be exported by using the + * {@link ExportSnapshot} tool, to a pure-hdfs cluster, and this InputFormat can be used to + * run the mapreduce job directly over the snapshot files. + *

+ * Usage is similar to TableInputFormat, and + * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job, boolean)} + * can be used to configure the job. + *

{@code
+ * Job job = new Job(conf);
+ * Scan scan = new Scan();
+ * TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+ *      scan, MyTableMapper.class, MyMapKeyOutput.class,
+ *      MyMapOutputValueWritable.class, job, true);
+ * }
+ * 
+ *

+ * Internally, this input format restores the snapshot into the working directory. Similar to + * {@link TableInputFormat} an InputSplit is created per region. The region is opened for reading + * from each RecordReader. An internal RegionScanner is used to execute the {@link Scan} obtained + * from the user. + *

+ * The user has to have sufficient access rights in the file system to access the snapshot + * files, and referenced files. + */ +public class TableSnapshotInputFormat extends InputFormat { + // TODO: Snapshots files are owned in fs by the hbase user. There is no + // easy way to delegate access. + + private static final Log LOG = LogFactory.getLog(TableSnapshotInputFormat.class); + + private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name"; + private static final String TABLE_DIR_KEY = "hbase.TableSnapshotInputFormat.table.dir"; + + @VisibleForTesting + class SnapshotRegionSplit extends InputSplit implements Writable { + String regionName; + SnapshotRegionSplit(String regionName) { + this.regionName = regionName; + } + @Override + public long getLength() throws IOException, InterruptedException { + //TODO + return 0; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + //TODO + return null; + } + @Override + public void write(DataOutput out) throws IOException { + //TODO: PB this +// MapReduceProtos.SnapshotRegionSplit split = +// MapReduceProtos.SnapshotRegionSplit.newBuilder() +// .setRegionSpecifier( +// RegionSpecifier.newBuilder() +// .setType(RegionSpecifierType.ENCODED_REGION_NAME) +// .setValue(ByteString.copyFrom(Bytes.toBytes(regionName))).build()).build(); +// split.writeDelimitedTo(out); + WritableUtils.writeString(out, regionName); + } + @Override + public void readFields(DataInput in) throws IOException { + WritableUtils.readString(in); + } + } + + @VisibleForTesting + class SnapshotRegionRecordReader extends RecordReader { + private SnapshotRegionSplit split; + private HRegion region; + private Scan scan; + RegionScanner scanner; + List values; + Result result = null; + ImmutableBytesWritable row = null; + boolean more; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, + InterruptedException { + + Configuration conf = context.getConfiguration(); + this.split = (SnapshotRegionSplit) split; + String regionName = this.split.regionName; + String snapshotName = getSnapshotName(conf); + Path rootDir = new Path(conf.get(HConstants.HBASE_DIR)); + FileSystem fs = rootDir.getFileSystem(conf); + + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); + SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); + + //load table descriptor + HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, snapshotDir); + + //load region descriptor + Path regionDir = new Path(snapshotDir, regionName); + HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir); + + // create scan + String scanStr = conf.get(TableInputFormat.SCAN); + if (scanStr == null) { + throw new IllegalArgumentException("A Scan is not configured for this job"); + } + try { + scan = TableMapReduceUtil.convertStringToScan(scanStr); + } catch (IOException e) { + LOG.error("An error occurred.", e); + } + scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); // region is immutable, this should be fine, + // otherwise we have to set the thread read point + + Path tableDir = new Path(conf.get(TABLE_DIR_KEY)); + // open region from the snapshot directory + this.region = HRegion.openHRegion(conf, fs, rootDir, tableDir, hri, htd, null, null, null); + + // create region scanner + this.scanner = region.getScanner(scan); + values = new ArrayList(); + this.more = true; + + region.startRegionOperation(); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + values.clear(); + // RegionScanner.next() has a different contract than RecordReader.nextKeyValue(). Scanner + // indicates no value read by returning empty results. Returns boolean indicates if more + // rows exist AFTER this one + if (!more) { + return false; + } + more = scanner.nextRaw(values, scan.getBatch()); + if (values == null || values.isEmpty()) { + //we are done + return false; + } + + this.result = new Result(values); + if (this.row == null) { + this.row = new ImmutableBytesWritable(); + } + this.row.set(result.getRow()); + + return true; + } + + @Override + public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { + return row; + } + + @Override + public Result getCurrentValue() throws IOException, InterruptedException { + return result; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return 0; + } + + @Override + public void close() throws IOException { + if (this.scanner != null) { + this.scanner.close(); + } + if (region != null) { + region.closeRegionOperation(); + region.close(true); + } + } + } + + @Override + public RecordReader createRecordReader( + InputSplit split, TaskAttemptContext context) throws IOException { + return new SnapshotRegionRecordReader(); + } + + @Override + public List getSplits(JobContext job) throws IOException, InterruptedException { + Configuration conf = job.getConfiguration(); + String snapshotName = getSnapshotName(job.getConfiguration()); + + Path rootDir = new Path(conf.get(HConstants.HBASE_DIR)); + FileSystem fs = rootDir.getFileSystem(conf); + + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); + SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); + + Set snapshotRegionNames = SnapshotReferenceUtil.getSnapshotRegionNames(fs, snapshotDir); + if (snapshotRegionNames == null) { + throw new IllegalArgumentException("Snapshot seems empty"); + } + + //TODO: take a peek at the passed scan object. Include only the regions that fall in the range + + List splits = new ArrayList(snapshotRegionNames.size()); + for (String region : snapshotRegionNames) { + splits.add(new SnapshotRegionSplit(region)); + } + + return splits; + } + + public static void setInput(Job job, String snapshotName) throws IOException { + Configuration conf = job.getConfiguration(); + conf.set(SNAPSHOT_NAME_KEY, snapshotName); + + Path rootDir = new Path(conf.get(HConstants.HBASE_DIR)); + FileSystem fs = rootDir.getFileSystem(conf); + + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); + SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); + + //load table descriptor + HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, snapshotDir); + + Path tableDir = new Path(new Path("/users/enis"), UUID.randomUUID().toString()); //TODO change this, delete afterwards + + MonitoredTask status = TaskMonitor.get().createStatus( + "Restoring snapshot '" + snapshotName + "' to directory " + tableDir); + ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(); + + RestoreSnapshotHelper helper = new RestoreSnapshotHelper(conf, fs, snapshotDesc, + snapshotDir, htd, tableDir, monitor, status); + helper.restoreHdfsRegions(); // TODO: restore from record readers to parallelize. + + if (LOG.isDebugEnabled()) { + FSUtils.logFileSystemState(fs, tableDir, LOG); + } + + conf.set(TABLE_DIR_KEY, tableDir.toString()); + } + + private static String getSnapshotName(Configuration conf) { + String snapshotName = conf.get(SNAPSHOT_NAME_KEY); + if (snapshotName == null) { + throw new IllegalArgumentException("Snapshot name must be provided"); + } + return snapshotName; + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 7cd9a18..fc3e74c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.IsolationLevel; @@ -89,7 +90,6 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare; import org.apache.hadoop.hbase.exceptions.DroppedSnapshotException; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; @@ -478,7 +478,7 @@ public class HRegion implements HeapSize { // , Writable{ // When hbase.regionserver.optionallogflushinterval <= 0 , deferred log sync is disabled. this.deferredLogSyncDisabled = conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000) <= 0; - + if (rsServices != null) { this.rsAccounting = this.rsServices.getRegionServerAccounting(); // don't initialize coprocessors if not running within a regionserver @@ -2049,8 +2049,8 @@ public class HRegion implements HeapSize { // , Writable{ // calling the pre CP hook for batch mutation if (coprocessorHost != null) { - MiniBatchOperationInProgress> miniBatchOp = - new MiniBatchOperationInProgress>(batchOp.operations, + MiniBatchOperationInProgress> miniBatchOp = + new MiniBatchOperationInProgress>(batchOp.operations, batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L; } @@ -2086,7 +2086,7 @@ public class HRegion implements HeapSize { // , Writable{ batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; Mutation m = batchOp.operations[i].getFirst(); - Durability tmpDur = m.getDurability(); + Durability tmpDur = m.getDurability(); if (tmpDur.ordinal() > durability.ordinal()) { durability = tmpDur; } @@ -2136,8 +2136,8 @@ public class HRegion implements HeapSize { // , Writable{ walSyncSuccessful = true; // calling the post CP hook for batch mutation if (coprocessorHost != null) { - MiniBatchOperationInProgress> miniBatchOp = - new MiniBatchOperationInProgress>(batchOp.operations, + MiniBatchOperationInProgress> miniBatchOp = + new MiniBatchOperationInProgress>(batchOp.operations, batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); coprocessorHost.postBatchMutate(miniBatchOp); } @@ -3396,10 +3396,10 @@ public class HRegion implements HeapSize { // , Writable{ public long getMvccReadPoint() { return this.readPt; } - + /** * Reset both the filter and the old filter. - * + * * @throws IOException in case a filter raises an I/O exception. */ protected void resetFilters() throws IOException { @@ -3608,7 +3608,7 @@ public class HRegion implements HeapSize { // , Writable{ // If joinedHeap is pointing to some other row, try to seek to a correct one. boolean mayHaveData = (nextJoinedKv != null && nextJoinedKv.matchingRow(currentRow, offset, length)) - || (this.joinedHeap.requestSeek(KeyValue.createFirstOnRow(currentRow, offset, length), + || (this.joinedHeap.requestSeek(KeyValue.createFirstOnRow(currentRow, offset, length), true, true) && joinedHeap.peek() != null && joinedHeap.peek().matchingRow(currentRow, offset, length)); @@ -3841,11 +3841,36 @@ public class HRegion implements HeapSize { // , Writable{ final HLog hlog, final boolean initialize, final boolean ignoreHLog) throws IOException { + Path tableDir = HTableDescriptor.getTableDir(rootDir, info.getTableName()); + return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, hlog, initialize, ignoreHLog); + } + + /** + * Convenience method creating new HRegions. Used by createTable. + * The {@link HLog} for the created region needs to be closed + * explicitly, if it is not null. + * Use {@link HRegion#getLog()} to get access. + * + * @param info Info for region to create. + * @param rootDir Root directory for HBase instance + * @param conf + * @param hTableDescriptor + * @param hlog shared HLog + * @param initialize - true to initialize the region + * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable + * @return new HRegion + * @throws IOException + */ + public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir, + final Configuration conf, + final HTableDescriptor hTableDescriptor, + final HLog hlog, + final boolean initialize, final boolean ignoreHLog) + throws IOException { LOG.info("creating HRegion " + info.getTableNameAsString() + " HTD == " + hTableDescriptor + " RootDir = " + rootDir + " Table name == " + info.getTableNameAsString()); - Path tableDir = HTableDescriptor.getTableDir(rootDir, info.getTableName()); FileSystem fs = FileSystem.get(conf); HRegionFileSystem rfs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info); HLog effectiveHLog = hlog; @@ -3861,6 +3886,8 @@ public class HRegion implements HeapSize { // , Writable{ return region; } + + public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Configuration conf, final HTableDescriptor hTableDescriptor, @@ -4001,13 +4028,38 @@ public class HRegion implements HeapSize { // , Writable{ final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal, final RegionServerServices rsServices, final CancelableProgressable reporter) throws IOException { + Path tableDir = HTableDescriptor.getTableDir(rootDir, info.getTableName()); + + return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter); + } + + /** + * Open a Region. + * @param conf The Configuration object to use. + * @param fs Filesystem to use + * @param rootDir Root directory for HBase instance + * @param tableDir Table directory + * @param info Info for region to be opened. + * @param htd the table descriptor + * @param wal HLog for region to use. This method will call + * HLog#setSequenceNumber(long) passing the result of the call to + * HRegion#getMinSequenceId() to ensure the log id is properly kept + * up. HRegionStore does this every time it opens a new region. + * @param rsServices An interface we can request flushes against. + * @param reporter An interface we can report progress against. + * @return new HRegion + * @throws IOException + */ + public static HRegion openHRegion(final Configuration conf, final FileSystem fs, + final Path rootDir, Path tableDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal, + final RegionServerServices rsServices, final CancelableProgressable reporter) + throws IOException { if (info == null) throw new NullPointerException("Passed region info is null"); LOG.info("HRegion.openHRegion Region name ==" + info.getRegionNameAsString()); if (LOG.isDebugEnabled()) { LOG.debug("Opening region: " + info); } - Path dir = HTableDescriptor.getTableDir(rootDir, info.getTableName()); - HRegion r = HRegion.newHRegion(dir, wal, fs, conf, info, htd, rsServices); + HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices); return r.openHRegion(reporter); } @@ -4216,7 +4268,7 @@ public class HRegion implements HeapSize { // , Writable{ LOG.debug("Files for region: " + b); b.getRegionFileSystem().logFileSystemState(LOG); } - + RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true); if (!rmt.prepare(null)) { throw new IOException("Unable to merge regions " + a + " and " + b); @@ -4242,7 +4294,7 @@ public class HRegion implements HeapSize { // , Writable{ LOG.debug("Files for new region"); dstRegion.getRegionFileSystem().logFileSystemState(LOG); } - + if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) { throw new IOException("Merged region " + dstRegion + " still has references after the compaction, is compaction canceled?"); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java index bae0366..2e7880a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java @@ -18,8 +18,8 @@ package org.apache.hadoop.hbase.snapshot; -import java.io.InputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.util.Arrays; import java.util.HashMap; @@ -41,9 +41,9 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.backup.HFileArchiver; -import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; @@ -411,7 +411,7 @@ public class RestoreSnapshotHelper { } // create the regions on disk - ModifyRegionUtils.createRegions(conf, tableDir.getParent(), + ModifyRegionUtils.createRegions(conf, tableDir.getParent(), tableDir, tableDesc, clonedRegionsInfo, new ModifyRegionUtils.RegionFillTask() { public void fillRegion(final HRegion region) throws IOException { cloneRegion(region, snapshotRegions.get(region.getRegionInfo().getEncodedName())); @@ -540,9 +540,11 @@ public class RestoreSnapshotHelper { * Since each log contains different tables data, logs must be split to * extract the table that we are interested in. */ - private void restoreWALs() throws IOException { + public void restoreWALs() throws IOException { final SnapshotLogSplitter logSplitter = new SnapshotLogSplitter(conf, fs, tableDir, Bytes.toBytes(snapshotDesc.getTable()), regionsMap); + // TODO: use executors to parallelize splitting + // TODO: once split, we do not need to split again for other restores try { // Recover.Edits SnapshotReferenceUtil.visitRecoveredEdits(fs, snapshotDir, diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java index 932c3cd..3dfff79 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java @@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; @@ -35,17 +34,11 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.backup.HFileArchiver; -import org.apache.hadoop.hbase.catalog.CatalogTracker; -import org.apache.hadoop.hbase.catalog.MetaEditor; -import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -92,6 +85,25 @@ public abstract class ModifyRegionUtils { public static List createRegions(final Configuration conf, final Path rootDir, final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions, final RegionFillTask task) throws IOException { + Path tableDir = HTableDescriptor.getTableDir(rootDir, hTableDescriptor.getName()); + return createRegions(conf, rootDir, tableDir, hTableDescriptor, newRegions, task); + } + + /** + * Create new set of regions on the specified file-system. + * NOTE: that you should add the regions to .META. after this operation. + * + * @param conf {@link Configuration} + * @param rootDir Root directory for HBase instance + * @param tableDir table directory + * @param hTableDescriptor description of the table + * @param newRegions {@link HRegionInfo} that describes the regions to create + * @param task {@link RegionFillTask} custom code to populate region after creation + * @throws IOException + */ + public static List createRegions(final Configuration conf, final Path rootDir, + final Path tableDir, final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions, + final RegionFillTask task) throws IOException { if (newRegions == null) return null; int regionNumber = newRegions.length; ThreadPoolExecutor regionOpenAndInitThreadPool = getRegionOpenAndInitThreadPool(conf, @@ -102,20 +114,7 @@ public abstract class ModifyRegionUtils { for (final HRegionInfo newRegion : newRegions) { completionService.submit(new Callable() { public HRegionInfo call() throws IOException { - // 1. Create HRegion - HRegion region = HRegion.createHRegion(newRegion, - rootDir, conf, hTableDescriptor, null, - false, true); - try { - // 2. Custom user code to interact with the created region - if (task != null) { - task.fillRegion(region); - } - } finally { - // 3. Close the new region to flush to disk. Close log file too. - region.close(); - } - return region.getRegionInfo(); + return createRegion(conf, rootDir, tableDir, hTableDescriptor, newRegion, task); } }); } @@ -137,6 +136,35 @@ public abstract class ModifyRegionUtils { return regionInfos; } + /** + * Create new set of regions on the specified file-system. + * @param conf {@link Configuration} + * @param rootDir Root directory for HBase instance + * @param tableDir table directory + * @param hTableDescriptor description of the table + * @param newRegion {@link HRegionInfo} that describes the region to create + * @param task {@link RegionFillTask} custom code to populate region after creation + * @throws IOException + */ + public static HRegionInfo createRegion(final Configuration conf, final Path rootDir, + final Path tableDir, final HTableDescriptor hTableDescriptor, final HRegionInfo newRegion, + final RegionFillTask task) throws IOException { + // 1. Create HRegion + HRegion region = HRegion.createHRegion(newRegion, + rootDir, tableDir, conf, hTableDescriptor, null, + false, true); + try { + // 2. Custom user code to interact with the created region + if (task != null) { + task.fillRegion(region); + } + } finally { + // 3. Close the new region to flush to disk. Close log file too. + region.close(); + } + return region.getRegionInfo(); + } + /* * used by createRegions() to get the thread pool executor based on the * "hbase.hregion.open.and.init.threads.max" property. diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index accf181..40e44cb 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -57,6 +57,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Waiter.Predicate; import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; @@ -65,7 +66,6 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.exceptions.MasterNotRunningException; import org.apache.hadoop.hbase.exceptions.TableExistsException; import org.apache.hadoop.hbase.exceptions.TableNotEnabledException; @@ -86,7 +86,6 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl; -import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -1236,24 +1235,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @throws IOException */ public int loadTable(final HTable t, final byte[] f) throws IOException { - t.setAutoFlush(false); - byte[] k = new byte[3]; - int rowCount = 0; - for (byte b1 = 'a'; b1 <= 'z'; b1++) { - for (byte b2 = 'a'; b2 <= 'z'; b2++) { - for (byte b3 = 'a'; b3 <= 'z'; b3++) { - k[0] = b1; - k[1] = b2; - k[2] = b3; - Put put = new Put(k); - put.add(f, null, k); - t.put(put); - rowCount++; - } - } - } - t.flushCommits(); - return rowCount; + return loadTable(t, new byte[][] {f}); } /** @@ -1264,23 +1246,27 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @throws IOException */ public int loadTable(final HTable t, final byte[][] f) throws IOException { + return loadTable(t, f, null); + } + + /** + * Load table of multiple column families with rows from 'aaa' to 'zzz'. + * @param t Table + * @param f Array of Families to load + * @param value the values of the cells. If null is passed, the row key is used as value + * @return Count of rows loaded. + * @throws IOException + */ + public int loadTable(final HTable t, final byte[][] f, byte[] value) throws IOException { t.setAutoFlush(false); - byte[] k = new byte[3]; int rowCount = 0; - for (byte b1 = 'a'; b1 <= 'z'; b1++) { - for (byte b2 = 'a'; b2 <= 'z'; b2++) { - for (byte b3 = 'a'; b3 <= 'z'; b3++) { - k[0] = b1; - k[1] = b2; - k[2] = b3; - Put put = new Put(k); - for (int i = 0; i < f.length; i++) { - put.add(f[i], null, k); - } - t.put(put); - rowCount++; - } + for (byte[] row : HBaseTestingUtility.ROWS) { + Put put = new Put(row); + for (int i = 0; i < f.length; i++) { + put.add(f[i], null, value != null ? value : row); } + t.put(put); + rowCount++; } t.flushCommits(); return rowCount; @@ -1377,6 +1363,22 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { return createMultiRegions(getConfiguration(), table, columnFamily); } + /** All the row values for the data loaded by {@link #loadTable(HTable, byte[])} */ + public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB + static { + int i = 0; + for (byte b1 = 'a'; b1 <= 'z'; b1++) { + for (byte b2 = 'a'; b2 <= 'z'; b2++) { + for (byte b3 = 'a'; b3 <= 'z'; b3++) { + ROWS[i][0] = b1; + ROWS[i][1] = b2; + ROWS[i][2] = b3; + i++; + } + } + } + } + public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"), Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), @@ -1937,7 +1939,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { /* * Retrieves a splittable region randomly from tableName - * + * * @param tableName name of table * @param maxAttempts maximum number of attempts, unlimited for value of -1 * @return the HRegion chosen, null if none was found within limit of maxAttempts @@ -1956,7 +1958,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } regCount = regions.size(); // There are chances that before we get the region for the table from an RS the region may - // be going for CLOSE. This may be because online schema change is enabled + // be going for CLOSE. This may be because online schema change is enabled if (regCount > 0) { idx = random.nextInt(regCount); // if we have just tried this region, there is no need to try again @@ -1974,7 +1976,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } while (maxAttempts == -1 || attempts < maxAttempts); return null; } - + public MiniZooKeeperCluster getZkCluster() { return zkCluster; } @@ -2252,10 +2254,10 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { scanner.close(); return result; } - + /** * Create region split keys between startkey and endKey - * + * * @param startKey * @param endKey * @param numRegions the number of regions to be created. it has to be greater than 3. @@ -2672,6 +2674,18 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } /** + * Returns a {@link Predicate} for checking that table is enabled + */ + public Waiter.Predicate predicateTableEnabled(final byte[] tableName) { + return new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return getHBaseAdmin().isTableEnabled(tableName); + } + }; + } + + /** * Create a set of column descriptors with the combination of compression, * encoding, bloom codecs available. * @return the list of column descriptors diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java new file mode 100644 index 0000000..a145b18 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.SnapshotRegionSplit; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestTableSnapshotInputFormat { + + private static final Log LOG = LogFactory.getLog(TestTableSnapshotInputFormat.class); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final int NUM_REGION_SERVERS = 2; + private static final String TABLE_NAME_STR = "test"; + private static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")}; + private static final byte[] TABLE_NAME = Bytes.toBytes(TABLE_NAME_STR); + private static FileSystem fs; + private static Path rootDir; + + @BeforeClass + public static void setupCluster() throws Exception { + setupConf(UTIL.getConfiguration()); + UTIL.startMiniCluster(NUM_REGION_SERVERS); + rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); + fs = rootDir.getFileSystem(UTIL.getConfiguration()); + } + + private static void setupConf(Configuration conf) { + // Enable snapshot + conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); + } + + @Before + public void setup() throws Exception { + } + + @After + public void tearDown() throws Exception { + UTIL.deleteTable(TABLE_NAME); + } + + class TestTableMapper extends TableMapper { + } + + @Test + public void testForOfflineTableSingleRegion() throws Exception { + UTIL.createTable(TABLE_NAME, FAMILIES); //create with one region + HBaseAdmin admin = UTIL.getHBaseAdmin(); + // make sure we don't fail on listing snapshots + SnapshotTestingUtils.assertNoSnapshots(admin); + + // put some stuff in the table + HTable table = new HTable(UTIL.getConfiguration(), TABLE_NAME); + UTIL.loadTable(table, FAMILIES); + + String snapshotName = "testForOfflineTableSingleRegion"; + + SnapshotTestingUtils.createSnapshotAndValidate(admin, TABLE_NAME_STR, + Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs); + + // after the snapshot, enable the table, change the cell values so that + // we make sure we are reading from snapshot + admin.enableTable(TABLE_NAME); + UTIL.waitFor(60000, UTIL.predicateTableEnabled(TABLE_NAME)); + + //load different values + byte[] value = Bytes.toBytes("after_snapshot_value"); + UTIL.loadTable(table, FAMILIES, value); + + //cause flush to create new files in the region + UTIL.flush(TABLE_NAME); + + Job job = new Job(UTIL.getConfiguration()); + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, + new Scan(), TestTableMapper.class, NullWritable.class, + NullWritable.class, job, false); + + doTest(job, 1); + + admin.deleteSnapshot(snapshotName); + } + + private void doTest(Job job, int numRegions) throws IOException, InterruptedException { + TableSnapshotInputFormat tsif = new TableSnapshotInputFormat(); + List splits = tsif.getSplits(new JobContext(job.getConfiguration(), null)); + + Assert.assertEquals(numRegions, splits.size()); + + SeenRowTracker rowTracker = new SeenRowTracker(); + + for (int i = 0; i < splits.size(); i++) { + // validate input split + InputSplit split = splits.get(i); + Assert.assertTrue(split instanceof SnapshotRegionSplit); + + // validate record reader + TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class); + when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration()); + RecordReader rr = tsif.createRecordReader(split, taskAttemptContext); + rr.initialize(split, taskAttemptContext); + + // validate we can will read all the data back + while (rr.nextKeyValue()) { + byte[] row = rr.getCurrentKey().get(); + Result result = rr.getCurrentValue(); + + CellScanner scanner = result.cellScanner(); + while (scanner.advance()) { + Cell cell = scanner.current(); + + //assert that all Cells in the Result have the same key + Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length, + cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); + } + + for (int j = 0; j < FAMILIES.length; j++) { + byte[] actual = result.getValue(FAMILIES[j], null); + Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row) + + " ,actual:" + Bytes.toString(actual), row, actual); + } + rowTracker.addRow(row); + } + + rr.close(); + } + + // validate all rows are seen + rowTracker.validate(); + } + + /** A tracker for tracking and validating table rows + * generated with {@link HBaseTestingUtility#loadTable(HTable, byte[])} + */ + class SeenRowTracker { + int dim = 'z' - 'a' + 1; + int[][][] seenRows = new int[dim][dim][dim]; //count of how many times the row is seen + + void reset() { + for (byte[] row : HBaseTestingUtility.ROWS) { + seenRows[i(row[0])][i(row[1])][i(row[2])] = 0; + } + } + + int i(byte b) { + return b - 'a'; + } + + void addRow(byte[] row) { + seenRows[i(row[0])][i(row[1])][i(row[2])]++; + } + + void validate() { + for (byte b1 = 'a'; b1 <= 'z'; b1++) { + for (byte b2 = 'a'; b2 <= 'z'; b2++) { + for (byte b3 = 'a'; b3 <= 'z'; b3++) { + int count =seenRows[i(b1)][i(b2)][i(b3)]; + if (count != 1) { + String row = new String(new byte[] {b1,b2,b3}); + throw new RuntimeException("Row:" + row + " has a seen count of " + count); + } + } + } + } + } + } + + //TODO: test from client side + + //TODO test with scan + + //TODO: test with offline hbase cluster (purely from hdfs) + + +}