diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java index 16de7c9..beaf5ee 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java @@ -18,9 +18,12 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.util.Bytes; /** * Helper class for custom client scanners. @@ -28,6 +31,49 @@ import org.apache.hadoop.classification.InterfaceAudience; @InterfaceAudience.Private public abstract class AbstractClientScanner implements ResultScanner { + protected ScanMetrics scanMetrics; + + /** + * Check and initialize if application wants to collect scan metrics + */ + protected void initScanMetrics(Scan scan) { + // check if application wants to collect scan metrics + byte[] enableMetrics = scan.getAttribute( + Scan.SCAN_ATTRIBUTES_METRICS_ENABLE); + if (enableMetrics != null && Bytes.toBoolean(enableMetrics)) { + scanMetrics = new ScanMetrics(); + } + } + + // TODO: should this be at ResultScanner? ScanMetrics is not public API it seems. + public ScanMetrics getScanMetrics() { + return scanMetrics; + } + + /** + * Get nbRows rows. + * How many RPCs are made is determined by the {@link Scan#setCaching(int)} + * setting (or hbase.client.scanner.caching in hbase-site.xml). + * @param nbRows number of rows to return + * @return Between zero and nbRows RowResults. Scan is done + * if returned array is of zero-length (We never return null). + * @throws IOException + */ + @Override + public Result [] next(int nbRows) throws IOException { + // Collect values to be returned here + ArrayList resultSets = new ArrayList(nbRows); + for(int i = 0; i < nbRows; i++) { + Result next = next(); + if (next != null) { + resultSets.add(next); + } else { + break; + } + } + return resultSets.toArray(new Result[resultSets.size()]); + } + @Override public Iterator iterator() { return new Iterator() { @@ -38,6 +84,7 @@ public abstract class AbstractClientScanner implements ResultScanner { // this method is where the actual advancing takes place, but you need // to call next() to consume it. hasNext() will only advance if there // isn't a pending next(). + @Override public boolean hasNext() { if (next == null) { try { @@ -52,6 +99,7 @@ public abstract class AbstractClientScanner implements ResultScanner { // get the pending next item and advance the iterator. returns null if // there is no next item. + @Override public Result next() { // since hasNext() does the real advancing, we call this to determine // if there is a next before proceeding. @@ -67,6 +115,7 @@ public abstract class AbstractClientScanner implements ResultScanner { return temp; } + @Override public void remove() { throw new UnsupportedOperationException(); } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index bc83170..c441c30 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; -import java.util.ArrayList; import java.util.LinkedList; import org.apache.commons.logging.Log; @@ -35,7 +34,6 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; -import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos; @@ -62,7 +60,6 @@ public class ClientScanner extends AbstractClientScanner { protected long lastNext; // Keep lastResult returned successfully in case we have to reset scanner. protected Result lastResult = null; - protected ScanMetrics scanMetrics = null; protected final long maxScannerResultSize; private final HConnection connection; private final TableName tableName; @@ -151,11 +148,7 @@ public class ClientScanner extends AbstractClientScanner { HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); // check if application wants to collect scan metrics - byte[] enableMetrics = scan.getAttribute( - Scan.SCAN_ATTRIBUTES_METRICS_ENABLE); - if (enableMetrics != null && Bytes.toBoolean(enableMetrics)) { - scanMetrics = new ScanMetrics(); - } + initScanMetrics(scan); // Use the caching from the Scan. If not set, use the default cache setting for this table. if (this.scan.getCaching() > 0) { @@ -170,7 +163,7 @@ public class ClientScanner extends AbstractClientScanner { initializeScannerInConstruction(); } - + protected void initializeScannerInConstruction() throws IOException{ // initialize the scanner nextScanner(this.caching, false); @@ -429,30 +422,6 @@ public class ClientScanner extends AbstractClientScanner { return null; } - /** - * Get nbRows rows. - * How many RPCs are made is determined by the {@link Scan#setCaching(int)} - * setting (or hbase.client.scanner.caching in hbase-site.xml). - * @param nbRows number of rows to return - * @return Between zero and nbRows RowResults. Scan is done - * if returned array is of zero-length (We never return null). - * @throws IOException - */ - @Override - public Result [] next(int nbRows) throws IOException { - // Collect values to be returned here - ArrayList resultSets = new ArrayList(nbRows); - for(int i = 0; i < nbRows; i++) { - Result next = next(); - if (next != null) { - resultSets.add(next); - } else { - break; - } - } - return resultSets.toArray(new Result[resultSets.size()]); - } - @Override public void close() { if (!scanMetricsPublished) writeScanMetrics(); diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index 426572f..9fa602b 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -82,7 +82,7 @@ public final class CellUtil { copyValueTo(cell, output, 0); return output; } - + public static byte[] getTagArray(Cell cell){ byte[] output = new byte[cell.getTagsLength()]; copyTagTo(cell, output, 0); @@ -370,4 +370,16 @@ public final class CellUtil { // Serialization is probably preceded by a length (it is in the KeyValueCodec at least). Bytes.SIZEOF_INT; } + + /** + * Returns true if the first range start1...end1 overlaps with the second range + * start2...end2, assuming the byte arrays represent row keys + */ + public static boolean overlappingKeys(final byte[] start1, final byte[] end1, + final byte[] start2, final byte[] end2) { + return (end2.length == 0 || start1.length == 0 || Bytes.compareTo(start1, + end2) < 0) + && (end1.length == 0 || start2.length == 0 || Bytes.compareTo(start2, + end1) < 0); + } } diff --git hbase-common/src/test/java/org/apache/hadoop/hbase/TestCellUtil.java hbase-common/src/test/java/org/apache/hadoop/hbase/TestCellUtil.java new file mode 100644 index 0000000..d031ea1 --- /dev/null +++ hbase-common/src/test/java/org/apache/hadoop/hbase/TestCellUtil.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestCellUtil { + + @Test + public void testOverlappingKeys() { + byte[] empty = HConstants.EMPTY_BYTE_ARRAY; + byte[] a = Bytes.toBytes("a"); + byte[] b = Bytes.toBytes("b"); + byte[] c = Bytes.toBytes("c"); + byte[] d = Bytes.toBytes("d"); + + Assert.assertTrue(CellUtil.overlappingKeys(a, b, a, b)); + Assert.assertTrue(CellUtil.overlappingKeys(a, c, a, b)); + Assert.assertTrue(CellUtil.overlappingKeys(a, b, a, c)); + Assert.assertTrue(CellUtil.overlappingKeys(b, c, a, c)); + Assert.assertTrue(CellUtil.overlappingKeys(a, c, b, c)); + Assert.assertTrue(CellUtil.overlappingKeys(a, d, b, c)); + Assert.assertTrue(CellUtil.overlappingKeys(b, c, a, d)); + + Assert.assertTrue(CellUtil.overlappingKeys(empty, b, a, b)); + Assert.assertTrue(CellUtil.overlappingKeys(empty, b, a, c)); + + Assert.assertTrue(CellUtil.overlappingKeys(a, b, empty, b)); + Assert.assertTrue(CellUtil.overlappingKeys(a, b, empty, c)); + + Assert.assertTrue(CellUtil.overlappingKeys(a, empty, a, b)); + Assert.assertTrue(CellUtil.overlappingKeys(a, empty, a, c)); + + Assert.assertTrue(CellUtil.overlappingKeys(a, b, empty, empty)); + Assert.assertTrue(CellUtil.overlappingKeys(empty, empty, a, b)); + } +} diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java new file mode 100644 index 0000000..4ff7a82 --- /dev/null +++ hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.IntegrationTestBase; +import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.hbase.IntegrationTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.util.ToolRunner; +import org.junit.After; +import org.junit.Before; +import org.junit.experimental.categories.Category; + +/** + * An integration test to test {@link TableSnapshotInputFormat} which enables + * reading directly from snapshot files without going through hbase servers. + * + * This test creates a table and loads the table with the rows ranging from + * 'aaa' to 'zzz', and for each row, sets the columns f1:(null) and f2:(null) to be + * the the same as the row value. + *
+ * aaa, f1: => aaa
+ * aaa, f2: => aaa
+ * aab, f1: => aab
+ * ....
+ * zzz, f2: => zzz
+ * 
+ * + * Then the test creates a snapshot from this table, and overrides the values in the original + * table with values 'after_snapshot_value'. The test, then runs a mapreduce job over the snapshot + * with a scan start row 'bbb' and stop row 'yyy'. The data is saved in a single reduce output file, and + * inspected later to verify that the MR job has seen all the values from the snapshot. + * + *

These parameters can be used to configure the job: + *
"IntegrationTestTableSnapshotInputFormat.table" => the name of the table + *
"IntegrationTestTableSnapshotInputFormat.snapshot" => the name of the snapshot + *
"IntegrationTestTableSnapshotInputFormat.numRegions" => number of regions in the table to be created + *
"IntegrationTestTableSnapshotInputFormat.tableDir" => temporary directory to restore the snapshot files + * + */ +@Category(IntegrationTests.class) +// Not runnable as a unit test. See TestTableSnapshotInputFormat +public class IntegrationTestTableSnapshotInputFormat extends IntegrationTestBase { + + private static final Log LOG = LogFactory.getLog(IntegrationTestTableSnapshotInputFormat.class); + + private static final String TABLE_NAME_KEY = "IntegrationTestTableSnapshotInputFormat.table"; + private static final String DEFAULT_TABLE_NAME = "IntegrationTestTableSnapshotInputFormat"; + + private static final String SNAPSHOT_NAME_KEY = "IntegrationTestTableSnapshotInputFormat.snapshot"; + + + private static final String NUM_REGIONS_KEY = "IntegrationTestTableSnapshotInputFormat.numRegions"; + private static final int DEFAULT_NUM_REGIONS = 32; + + private static final String TABLE_DIR_KEY = "IntegrationTestTableSnapshotInputFormat.tableDir"; + + private IntegrationTestingUtility util; + + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + util = getTestingUtil(conf); + } + + @Override + @Before + public void setUp() throws Exception { + util = getTestingUtil(getConf()); + util.initializeCluster(1); + this.setConf(util.getConfiguration()); + } + + @Override + @After + public void cleanUp() throws Exception { + util.restoreCluster(); + } + + @Override + public int runTestFromCommandLine() throws Exception { + Configuration conf = getConf(); + TableName tableName = TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME)); + String snapshotName = conf.get(SNAPSHOT_NAME_KEY, tableName.getQualifierAsString() + + "_snapshot_" + System.currentTimeMillis()); + int numRegions = conf.getInt(NUM_REGIONS_KEY, DEFAULT_NUM_REGIONS); + String tableDirStr = conf.get(TABLE_DIR_KEY); + Path tableDir; + if (tableDirStr == null) { + tableDir = util.getDataTestDirOnTestFS(tableName.getQualifierAsString()); + } else { + tableDir = new Path(tableDirStr); + } + + /* We create the table using HBaseAdmin#createTable(), which will create the table + * with desired number of regions. We pass bbb as startKey and yyy as endKey, so if + * desiredNumRegions is > 2, we create regions empty - bbb and yyy - empty, and we + * create numRegions - 2 regions between bbb - yyy. The test uses a Scan with startRow + * bbb and endRow yyy, so, we expect the first and last region to be filtered out in + * the input format, and we expect numRegions - 2 splits between bbb and yyy. + */ + int expectedNumSplits = numRegions > 2 ? numRegions - 2 : numRegions; + + TestTableSnapshotInputFormat.doTestWithMapReduce(util, tableName, snapshotName, tableDir, + numRegions, expectedNumSplits, false); + + return 0; + } + + @Override // CM is not intended to be run with this test + public String getTablename() { + return null; + } + + @Override + protected Set getColumnFamilies() { + return null; + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(conf); + int ret = ToolRunner.run(conf, new IntegrationTestTableSnapshotInputFormat(), args); + System.exit(ret); + } + +} diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MapReduceProtos.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MapReduceProtos.java index fb617fb..1457fe8 100644 --- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MapReduceProtos.java +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MapReduceProtos.java @@ -717,11 +717,764 @@ public final class MapReduceProtos { // @@protoc_insertion_point(class_scope:ScanMetrics) } + public interface TableSnapshotRegionSplitOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // optional .RegionSpecifier region = 1; + /** + * optional .RegionSpecifier region = 1; + */ + boolean hasRegion(); + /** + * optional .RegionSpecifier region = 1; + */ + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier getRegion(); + /** + * optional .RegionSpecifier region = 1; + */ + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder getRegionOrBuilder(); + + // repeated string locations = 2; + /** + * repeated string locations = 2; + */ + java.util.List + getLocationsList(); + /** + * repeated string locations = 2; + */ + int getLocationsCount(); + /** + * repeated string locations = 2; + */ + java.lang.String getLocations(int index); + /** + * repeated string locations = 2; + */ + com.google.protobuf.ByteString + getLocationsBytes(int index); + } + /** + * Protobuf type {@code TableSnapshotRegionSplit} + */ + public static final class TableSnapshotRegionSplit extends + com.google.protobuf.GeneratedMessage + implements TableSnapshotRegionSplitOrBuilder { + // Use TableSnapshotRegionSplit.newBuilder() to construct. + private TableSnapshotRegionSplit(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private TableSnapshotRegionSplit(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final TableSnapshotRegionSplit defaultInstance; + public static TableSnapshotRegionSplit getDefaultInstance() { + return defaultInstance; + } + + public TableSnapshotRegionSplit getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private TableSnapshotRegionSplit( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder subBuilder = null; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + subBuilder = region_.toBuilder(); + } + region_ = input.readMessage(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(region_); + region_ = subBuilder.buildPartial(); + } + bitField0_ |= 0x00000001; + break; + } + case 18: { + if (!((mutable_bitField0_ & 0x00000002) == 0x00000002)) { + locations_ = new com.google.protobuf.LazyStringArrayList(); + mutable_bitField0_ |= 0x00000002; + } + locations_.add(input.readBytes()); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + if (((mutable_bitField0_ & 0x00000002) == 0x00000002)) { + locations_ = new com.google.protobuf.UnmodifiableLazyStringList(locations_); + } + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.internal_static_TableSnapshotRegionSplit_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.internal_static_TableSnapshotRegionSplit_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit.class, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public TableSnapshotRegionSplit parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new TableSnapshotRegionSplit(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // optional .RegionSpecifier region = 1; + public static final int REGION_FIELD_NUMBER = 1; + private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier region_; + /** + * optional .RegionSpecifier region = 1; + */ + public boolean hasRegion() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional .RegionSpecifier region = 1; + */ + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier getRegion() { + return region_; + } + /** + * optional .RegionSpecifier region = 1; + */ + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder getRegionOrBuilder() { + return region_; + } + + // repeated string locations = 2; + public static final int LOCATIONS_FIELD_NUMBER = 2; + private com.google.protobuf.LazyStringList locations_; + /** + * repeated string locations = 2; + */ + public java.util.List + getLocationsList() { + return locations_; + } + /** + * repeated string locations = 2; + */ + public int getLocationsCount() { + return locations_.size(); + } + /** + * repeated string locations = 2; + */ + public java.lang.String getLocations(int index) { + return locations_.get(index); + } + /** + * repeated string locations = 2; + */ + public com.google.protobuf.ByteString + getLocationsBytes(int index) { + return locations_.getByteString(index); + } + + private void initFields() { + region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); + locations_ = com.google.protobuf.LazyStringArrayList.EMPTY; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (hasRegion()) { + if (!getRegion().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeMessage(1, region_); + } + for (int i = 0; i < locations_.size(); i++) { + output.writeBytes(2, locations_.getByteString(i)); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(1, region_); + } + { + int dataSize = 0; + for (int i = 0; i < locations_.size(); i++) { + dataSize += com.google.protobuf.CodedOutputStream + .computeBytesSizeNoTag(locations_.getByteString(i)); + } + size += dataSize; + size += 1 * getLocationsList().size(); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit other = (org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit) obj; + + boolean result = true; + result = result && (hasRegion() == other.hasRegion()); + if (hasRegion()) { + result = result && getRegion() + .equals(other.getRegion()); + } + result = result && getLocationsList() + .equals(other.getLocationsList()); + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + private int memoizedHashCode = 0; + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasRegion()) { + hash = (37 * hash) + REGION_FIELD_NUMBER; + hash = (53 * hash) + getRegion().hashCode(); + } + if (getLocationsCount() > 0) { + hash = (37 * hash) + LOCATIONS_FIELD_NUMBER; + hash = (53 * hash) + getLocationsList().hashCode(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code TableSnapshotRegionSplit} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplitOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.internal_static_TableSnapshotRegionSplit_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.internal_static_TableSnapshotRegionSplit_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit.class, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit.Builder.class); + } + + // Construct using org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getRegionFieldBuilder(); + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + if (regionBuilder_ == null) { + region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); + } else { + regionBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000001); + locations_ = com.google.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.internal_static_TableSnapshotRegionSplit_descriptor; + } + + public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit getDefaultInstanceForType() { + return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit build() { + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit buildPartial() { + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit result = new org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + if (regionBuilder_ == null) { + result.region_ = region_; + } else { + result.region_ = regionBuilder_.build(); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + locations_ = new com.google.protobuf.UnmodifiableLazyStringList( + locations_); + bitField0_ = (bitField0_ & ~0x00000002); + } + result.locations_ = locations_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit) { + return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit other) { + if (other == org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit.getDefaultInstance()) return this; + if (other.hasRegion()) { + mergeRegion(other.getRegion()); + } + if (!other.locations_.isEmpty()) { + if (locations_.isEmpty()) { + locations_ = other.locations_; + bitField0_ = (bitField0_ & ~0x00000002); + } else { + ensureLocationsIsMutable(); + locations_.addAll(other.locations_); + } + onChanged(); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (hasRegion()) { + if (!getRegion().isInitialized()) { + + return false; + } + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // optional .RegionSpecifier region = 1; + private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder> regionBuilder_; + /** + * optional .RegionSpecifier region = 1; + */ + public boolean hasRegion() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional .RegionSpecifier region = 1; + */ + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier getRegion() { + if (regionBuilder_ == null) { + return region_; + } else { + return regionBuilder_.getMessage(); + } + } + /** + * optional .RegionSpecifier region = 1; + */ + public Builder setRegion(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier value) { + if (regionBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + region_ = value; + onChanged(); + } else { + regionBuilder_.setMessage(value); + } + bitField0_ |= 0x00000001; + return this; + } + /** + * optional .RegionSpecifier region = 1; + */ + public Builder setRegion( + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder builderForValue) { + if (regionBuilder_ == null) { + region_ = builderForValue.build(); + onChanged(); + } else { + regionBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000001; + return this; + } + /** + * optional .RegionSpecifier region = 1; + */ + public Builder mergeRegion(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier value) { + if (regionBuilder_ == null) { + if (((bitField0_ & 0x00000001) == 0x00000001) && + region_ != org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance()) { + region_ = + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.newBuilder(region_).mergeFrom(value).buildPartial(); + } else { + region_ = value; + } + onChanged(); + } else { + regionBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000001; + return this; + } + /** + * optional .RegionSpecifier region = 1; + */ + public Builder clearRegion() { + if (regionBuilder_ == null) { + region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); + onChanged(); + } else { + regionBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + /** + * optional .RegionSpecifier region = 1; + */ + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder getRegionBuilder() { + bitField0_ |= 0x00000001; + onChanged(); + return getRegionFieldBuilder().getBuilder(); + } + /** + * optional .RegionSpecifier region = 1; + */ + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder getRegionOrBuilder() { + if (regionBuilder_ != null) { + return regionBuilder_.getMessageOrBuilder(); + } else { + return region_; + } + } + /** + * optional .RegionSpecifier region = 1; + */ + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder> + getRegionFieldBuilder() { + if (regionBuilder_ == null) { + regionBuilder_ = new com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder>( + region_, + getParentForChildren(), + isClean()); + region_ = null; + } + return regionBuilder_; + } + + // repeated string locations = 2; + private com.google.protobuf.LazyStringList locations_ = com.google.protobuf.LazyStringArrayList.EMPTY; + private void ensureLocationsIsMutable() { + if (!((bitField0_ & 0x00000002) == 0x00000002)) { + locations_ = new com.google.protobuf.LazyStringArrayList(locations_); + bitField0_ |= 0x00000002; + } + } + /** + * repeated string locations = 2; + */ + public java.util.List + getLocationsList() { + return java.util.Collections.unmodifiableList(locations_); + } + /** + * repeated string locations = 2; + */ + public int getLocationsCount() { + return locations_.size(); + } + /** + * repeated string locations = 2; + */ + public java.lang.String getLocations(int index) { + return locations_.get(index); + } + /** + * repeated string locations = 2; + */ + public com.google.protobuf.ByteString + getLocationsBytes(int index) { + return locations_.getByteString(index); + } + /** + * repeated string locations = 2; + */ + public Builder setLocations( + int index, java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureLocationsIsMutable(); + locations_.set(index, value); + onChanged(); + return this; + } + /** + * repeated string locations = 2; + */ + public Builder addLocations( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureLocationsIsMutable(); + locations_.add(value); + onChanged(); + return this; + } + /** + * repeated string locations = 2; + */ + public Builder addAllLocations( + java.lang.Iterable values) { + ensureLocationsIsMutable(); + super.addAll(values, locations_); + onChanged(); + return this; + } + /** + * repeated string locations = 2; + */ + public Builder clearLocations() { + locations_ = com.google.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000002); + onChanged(); + return this; + } + /** + * repeated string locations = 2; + */ + public Builder addLocationsBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + ensureLocationsIsMutable(); + locations_.add(value); + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:TableSnapshotRegionSplit) + } + + static { + defaultInstance = new TableSnapshotRegionSplit(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:TableSnapshotRegionSplit) + } + private static com.google.protobuf.Descriptors.Descriptor internal_static_ScanMetrics_descriptor; private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_ScanMetrics_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_TableSnapshotRegionSplit_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_TableSnapshotRegionSplit_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -732,9 +1485,11 @@ public final class MapReduceProtos { static { java.lang.String[] descriptorData = { "\n\017MapReduce.proto\032\013HBase.proto\".\n\013ScanMe" + - "trics\022\037\n\007metrics\030\001 \003(\0132\016.NameInt64PairBB" + - "\n*org.apache.hadoop.hbase.protobuf.gener" + - "atedB\017MapReduceProtosH\001\240\001\001" + "trics\022\037\n\007metrics\030\001 \003(\0132\016.NameInt64Pair\"O" + + "\n\030TableSnapshotRegionSplit\022 \n\006region\030\001 \001" + + "(\0132\020.RegionSpecifier\022\021\n\tlocations\030\002 \003(\tB" + + "B\n*org.apache.hadoop.hbase.protobuf.gene" + + "ratedB\017MapReduceProtosH\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -747,6 +1502,12 @@ public final class MapReduceProtos { com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ScanMetrics_descriptor, new java.lang.String[] { "Metrics", }); + internal_static_TableSnapshotRegionSplit_descriptor = + getDescriptor().getMessageTypes().get(1); + internal_static_TableSnapshotRegionSplit_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_TableSnapshotRegionSplit_descriptor, + new java.lang.String[] { "Region", "Locations", }); return null; } }; diff --git hbase-protocol/src/main/protobuf/MapReduce.proto hbase-protocol/src/main/protobuf/MapReduce.proto index e9ef17a..0c4c29e 100644 --- hbase-protocol/src/main/protobuf/MapReduce.proto +++ hbase-protocol/src/main/protobuf/MapReduce.proto @@ -18,15 +18,18 @@ //This file includes protocol buffers used in MapReduce only. - option java_package = "org.apache.hadoop.hbase.protobuf.generated"; - option java_outer_classname = "MapReduceProtos"; - option java_generate_equals_and_hash = true; - option optimize_for = SPEED; +option java_package = "org.apache.hadoop.hbase.protobuf.generated"; +option java_outer_classname = "MapReduceProtos"; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; - import "HBase.proto"; +import "HBase.proto"; - message ScanMetrics { +message ScanMetrics { + repeated NameInt64Pair metrics = 1; +} - repeated NameInt64Pair metrics = 1; - - } \ No newline at end of file +message TableSnapshotRegionSplit { + optional RegionSpecifier region = 1; + repeated string locations = 2; +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/HDFSBlocksDistribution.java hbase-server/src/main/java/org/apache/hadoop/hbase/HDFSBlocksDistribution.java index 11619d3..88d7d3f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/HDFSBlocksDistribution.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/HDFSBlocksDistribution.java @@ -38,7 +38,7 @@ import org.apache.hadoop.classification.InterfaceAudience; public class HDFSBlocksDistribution { private Map hostAndWeights = null; private long uniqueBlocksTotalWeight = 0; - + /** * Stores the hostname and weight for that hostname. * @@ -58,7 +58,7 @@ public class HDFSBlocksDistribution { * Constructor * @param host the host name * @param weight the weight - */ + */ public HostAndWeight(String host, long weight) { this.host = host; this.weight = weight; @@ -67,28 +67,28 @@ public class HDFSBlocksDistribution { /** * add weight * @param weight the weight - */ + */ public void addWeight(long weight) { this.weight += weight; } /** * @return the host name - */ + */ public String getHost() { return host; } /** * @return the weight - */ + */ public long getWeight() { return weight; } /** * comparator used to sort hosts based on weight - */ + */ public static class WeightComparator implements Comparator { @Override public int compare(HostAndWeight l, HostAndWeight r) { @@ -99,7 +99,7 @@ public class HDFSBlocksDistribution { } } } - + /** * Constructor */ @@ -137,12 +137,12 @@ public class HDFSBlocksDistribution { /** * add some weight to the total unique weight * @param weight the weight - */ + */ private void addUniqueWeight(long weight) { uniqueBlocksTotalWeight += weight; } - - + + /** * add some weight to a specific host * @param host the host name @@ -186,14 +186,14 @@ public class HDFSBlocksDistribution { } return weight; } - + /** * @return the sum of all unique blocks' weight */ public long getUniqueBlocksTotalWeight() { return uniqueBlocksTotalWeight; } - + /** * return the locality index of a given host * @param host the host name @@ -207,8 +207,8 @@ public class HDFSBlocksDistribution { } return localityIndex; } - - + + /** * This will add the distribution from input to this object * @param otherBlocksDistribution the other hdfs blocks distribution @@ -223,19 +223,27 @@ public class HDFSBlocksDistribution { } addUniqueWeight(otherBlocksDistribution.getUniqueBlocksTotalWeight()); } - + /** * return the sorted list of hosts in terms of their weights */ public List getTopHosts() { - NavigableSet orderedHosts = new TreeSet( - new HostAndWeight.WeightComparator()); - orderedHosts.addAll(this.hostAndWeights.values()); - List topHosts = new ArrayList(orderedHosts.size()); - for(HostAndWeight haw : orderedHosts.descendingSet()) { + HostAndWeight[] hostAndWeights = getTopHostsWithWeights(); + List topHosts = new ArrayList(hostAndWeights.length); + for(HostAndWeight haw : hostAndWeights) { topHosts.add(haw.getHost()); } return topHosts; } + /** + * return the sorted list of hosts in terms of their weights + */ + public HostAndWeight[] getTopHostsWithWeights() { + NavigableSet orderedHosts = new TreeSet( + new HostAndWeight.WeightComparator()); + orderedHosts.addAll(this.hostAndWeights.values()); + return orderedHosts.descendingSet().toArray(new HostAndWeight[orderedHosts.size()]); + } + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java new file mode 100644 index 0000000..588bd55 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.mortbay.log.Log; + +/** + * A client scanner for a region opened for read-only on the client side. Assumes region data + * is not changing. + */ +@InterfaceAudience.Private +public class ClientSideRegionScanner extends AbstractClientScanner { + + private HRegion region; + private Scan scan; + RegionScanner scanner; + List values; + Result result = null; + + public ClientSideRegionScanner(Configuration conf, FileSystem fs, + Path rootDir, HTableDescriptor htd, HRegionInfo hri, Scan scan, ScanMetrics scanMetrics) throws IOException { + + this.scan = scan; + + // region is immutable, set isolation level + scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); + + // open region from the snapshot directory + this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null); + + // create an internal region scanner + this.scanner = region.getScanner(scan); + values = new ArrayList(); + + if (scanMetrics == null) { + initScanMetrics(scan); + } else { + this.scanMetrics = scanMetrics; + } + region.startRegionOperation(); + } + + @Override + public Result next() throws IOException { + values.clear(); + + scanner.nextRaw(values, -1); // pass -1 as limit so that we see the whole row. + if (values == null || values.isEmpty()) { + //we are done + return null; + } + + this.result = Result.create(values); + if (this.scanMetrics != null) { + long resultSize = 0; + for (Cell kv : values) { + // TODO add getLength to Cell/use CellUtil#estimatedSizeOf + resultSize += KeyValueUtil.ensureKeyValue(kv).getLength(); + } + this.scanMetrics.countOfBytesInResults.addAndGet(resultSize); + } + + return result; + } + + @Override + public void close() { + if (this.scanner != null) { + try { + this.scanner.close(); + this.scanner = null; + } catch (IOException ex) { + Log.warn("Exception while closing scanner", ex); + } + } + if (this.region != null) { + this.region.closeRegionOperation(); + try { + this.region.close(true); + this.region = null; + } catch (IOException ex) { + Log.warn("Exception while closing region", ex); + } + } + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java new file mode 100644 index 0000000..4a2f355 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.snapshot.ExportSnapshot; +import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil; +import org.apache.hadoop.hbase.util.FSTableDescriptors; + +/** + * A Scanner which performs a scan over snapshot files. Using this class requires to restore the + * snapshot to a temporary empty directory, which will copy the snapshot reference files into that + * directory. Actual data files are not copied. + * + *

+ * This also allows to run the scan from an + * online or offline hbase cluster. The snapshot files can be exported by using the + * {@link ExportSnapshot} tool, to a pure-hdfs cluster, and this scanner can be used to + * run the scan directly over the snapshot files. + * + *

+ * An internal RegionScanner is used to execute the {@link Scan} obtained + * from the user for each region in the snapshot. + *

+ * HBase owns all the data and snapshot files on the filesystem. Only the HBase user can read from + * snapshot files and data files. HBase also enforces security because all the requests are handled + * by the server layer, and the user cannot read from the data files directly. To read from snapshot + * files directly from the file system, the user who is running the MR job must have sufficient + * permissions to access snapshot and reference files. This means that to run mapreduce over + * snapshot files, the job has to be run as the HBase user or the user must have group or other + * priviledges in the filesystem (See HBASE-8369). Note that, given other users access to read from + * snapshot/data files will completely circumvent the access control enforced by HBase. + * @see TableSnapshotInputFormat + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class TableSnapshotScanner extends AbstractClientScanner { + + private static final Log LOG = LogFactory.getLog(TableSnapshotScanner.class); + + private Configuration conf; + private String snapshotName; + private FileSystem fs; + private Path rootDir; + private Path restoreDir; + private Scan scan; + private ArrayList regions; + private HTableDescriptor htd; + + private ClientSideRegionScanner currentRegionScanner = null; + private int currentRegion = -1; + + public TableSnapshotScanner(Configuration conf, Path restoreDir, + String snapshotName, Scan scan) throws IOException { + this(conf, new Path(conf.get(HConstants.HBASE_DIR)), + restoreDir, snapshotName, scan); + } + + public TableSnapshotScanner(Configuration conf, Path rootDir, + Path restoreDir, String snapshotName, Scan scan) throws IOException { + this.conf = conf; + this.snapshotName = snapshotName; + this.rootDir = rootDir; + this.restoreDir = restoreDir; + this.scan = scan; + this.fs = rootDir.getFileSystem(conf); + init(); + } + + private void init() throws IOException { + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); + + //load table descriptor + htd = FSTableDescriptors.getTableDescriptorFromFs(fs, snapshotDir); + + Set snapshotRegionNames + = SnapshotReferenceUtil.getSnapshotRegionNames(fs, snapshotDir); + if (snapshotRegionNames == null) { + throw new IllegalArgumentException("Snapshot seems empty"); + } + + regions = new ArrayList(snapshotRegionNames.size()); + for (String regionName : snapshotRegionNames) { + // load region descriptor + Path regionDir = new Path(snapshotDir, regionName); + HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, + regionDir); + + if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), + hri.getStartKey(), hri.getEndKey())) { + regions.add(hri); + } + } + + // sort for regions according to startKey. + Collections.sort(regions); + + initScanMetrics(scan); + + RestoreSnapshotHelper.restoreSnapshotForScanner(conf, fs, + rootDir, restoreDir, snapshotName); + } + + @Override + public Result next() throws IOException { + Result result = null; + while (true) { + if (currentRegionScanner == null) { + currentRegion++; + if (currentRegion >= regions.size()) { + return null; + } + + HRegionInfo hri = regions.get(currentRegion); + currentRegionScanner = new ClientSideRegionScanner(conf, fs, + restoreDir, htd, hri, scan, scanMetrics); + if (this.scanMetrics != null) { + this.scanMetrics.countOfRegions.incrementAndGet(); + } + } + + result = currentRegionScanner.next(); + if (result != null) { + return result; + } else { + currentRegionScanner.close(); + currentRegionScanner = null; + } + } + } + + @Override + public void close() { + if (currentRegionScanner != null) { + currentRegionScanner.close(); + } + } + +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index 9b0dc13..a80092d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -58,6 +58,7 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.StringUtils; +import org.cliffc.high_scale_lib.Counter; import com.google.protobuf.InvalidProtocolBufferException; @@ -115,6 +116,32 @@ public class TableMapReduceUtil { job, true); } + /** + * Use this before submitting a TableMap job. It will appropriately set up + * the job. + * + * @param table The table name to read from. + * @param scan The scan instance with the columns, time range etc. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + * @throws IOException When setting up the details fails. + */ + public static void initTableMapperJob(String table, Scan scan, + Class mapper, + Class outputKeyClass, + Class outputValueClass, Job job, + boolean addDependencyJars, Class inputFormatClass) + throws IOException { + initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job, + addDependencyJars, true, inputFormatClass); + } + + /** * Use this before submitting a TableMap job. It will appropriately set up * the job. @@ -128,13 +155,16 @@ public class TableMapReduceUtil { * carrying all necessary HBase configuration. * @param addDependencyJars upload HBase jars and jars for any of the configured * job classes via the distributed cache (tmpjars). + * @param initCredentials whether to initialize hbase auth credentials for the job + * @param inputFormatClass the input format * @throws IOException When setting up the details fails. */ public static void initTableMapperJob(String table, Scan scan, Class mapper, Class outputKeyClass, Class outputValueClass, Job job, - boolean addDependencyJars, Class inputFormatClass) + boolean addDependencyJars, boolean initCredentials, + Class inputFormatClass) throws IOException { job.setInputFormatClass(inputFormatClass); if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass); @@ -153,7 +183,9 @@ public class TableMapReduceUtil { if (addDependencyJars) { addDependencyJars(job); } - initCredentials(job); + if (initCredentials) { + initCredentials(job); + } } /** @@ -233,6 +265,39 @@ public class TableMapReduceUtil { } /** + * Sets up the job for reading from a table snapshot. It bypasses hbase servers + * and read directly from snapshot files. + * + * @param snapshotName The name of the snapshot (of a table) to read from. + * @param scan The scan instance with the columns, time range etc. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + * + * @param tmpTableDir temporary directory in the filesystem to restore the + * table snapshot into. + * @throws IOException When setting up the details fails. + * @see TableSnapshotInputFormat + */ + public static void initTableSnapshotMapperJob(String snapshotName, Scan scan, + Class mapper, + Class outputKeyClass, + Class outputValueClass, Job job, + boolean addDependencyJars, Path tmpTableDir) + throws IOException { + TableSnapshotInputFormat.setInput(job, snapshotName, tmpTableDir); + initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, + outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class); + + // We would need even more libraries that hbase-server depends on + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Counter.class); + } + + /** * Use this before submitting a Multi TableMap job. It will appropriately set * up the job. * diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java index a21f4e0..51ca992 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java @@ -35,11 +35,9 @@ import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong; import org.apache.hadoop.util.StringUtils; /** @@ -97,7 +95,7 @@ public class TableRecordReaderImpl { * @return The getCounter method or null if not available. * @throws IOException */ - private Method retrieveGetCounterWithStringsParams(TaskAttemptContext context) + protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context) throws IOException { Method m = null; try { @@ -253,11 +251,6 @@ public class TableRecordReaderImpl { * @throws IOException */ private void updateCounters() throws IOException { - // we can get access to counters only if hbase uses new mapreduce APIs - if (this.getCounter == null) { - return; - } - byte[] serializedMetrics = currentScan.getAttribute( Scan.SCAN_ATTRIBUTES_METRICS_DATA); if (serializedMetrics == null || serializedMetrics.length == 0 ) { @@ -266,16 +259,25 @@ public class TableRecordReaderImpl { ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(serializedMetrics); + updateCounters(scanMetrics, numRestarts, getCounter, context); + } + + protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts, + Method getCounter, TaskAttemptContext context) { + // we can get access to counters only if hbase uses new mapreduce APIs + if (getCounter == null) { + return; + } + try { for (Map.Entry entry:scanMetrics.getMetricsMap().entrySet()) { - Counter ct = (Counter)this.getCounter.invoke(context, + Counter ct = (Counter)getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, entry.getKey()); ct.increment(entry.getValue()); } - - ((Counter) this.getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, - "NUM_SCANNER_RESTARTS")).increment(numRestarts); + ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, + "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts); } catch (Exception e) { LOG.debug("can't update counter." + StringUtils.stringifyException(e)); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java new file mode 100644 index 0000000..e56279e --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java @@ -0,0 +1,393 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.ClientSideRegionScanner; +import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TableSnapshotScanner; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.snapshot.ExportSnapshot; +import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; + +/** + * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job + * bypasses HBase servers, and directly accesses the underlying files (hfile, recovered edits, + * hlogs, etc) directly to provide maximum performance. The snapshot is not required to be + * restored to the live cluster or cloned. This also allows to run the mapreduce job from an + * online or offline hbase cluster. The snapshot files can be exported by using the + * {@link ExportSnapshot} tool, to a pure-hdfs cluster, and this InputFormat can be used to + * run the mapreduce job directly over the snapshot files. + *

+ * Usage is similar to TableInputFormat, and + * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job, boolean)} + * can be used to configure the job. + *

{@code
+ * Job job = new Job(conf);
+ * Scan scan = new Scan();
+ * TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+ *      scan, MyTableMapper.class, MyMapKeyOutput.class,
+ *      MyMapOutputValueWritable.class, job, true);
+ * }
+ * 
+ *

+ * Internally, this input format restores the snapshot into the given tmp directory. Similar to + * {@link TableInputFormat} an InputSplit is created per region. The region is opened for reading + * from each RecordReader. An internal RegionScanner is used to execute the {@link Scan} obtained + * from the user. + *

+ * HBase owns all the data and snapshot files on the filesystem. Only the HBase user can read from + * snapshot files and data files. HBase also enforces security because all the requests are handled + * by the server layer, and the user cannot read from the data files directly. To read from snapshot + * files directly from the file system, the user who is running the MR job must have sufficient + * permissions to access snapshot and reference files. This means that to run mapreduce over + * snapshot files, the MR job has to be run as the HBase user or the user must have group or other + * priviledges in the filesystem (See HBASE-8369). Note that, given other users access to read from + * snapshot/data files will completely circumvent the access control enforced by HBase. + * @see TableSnapshotScanner + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class TableSnapshotInputFormat extends InputFormat { + // TODO: Snapshots files are owned in fs by the hbase user. There is no + // easy way to delegate access. + + private static final Log LOG = LogFactory.getLog(TableSnapshotInputFormat.class); + + /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */ + private static final String LOCALITY_CUTOFF_MULTIPLIER = "hbase.tablesnapshotinputformat.locality.cutoff.multiplier"; + private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f; + + private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name"; + private static final String TABLE_DIR_KEY = "hbase.TableSnapshotInputFormat.table.dir"; + + public static class TableSnapshotRegionSplit extends InputSplit implements Writable { + private String regionName; + private String[] locations; + + // constructor for mapreduce framework / Writable + public TableSnapshotRegionSplit() { } + + TableSnapshotRegionSplit(String regionName, List locations) { + this.regionName = regionName; + if (locations == null || locations.isEmpty()) { + this.locations = new String[0]; + } else { + this.locations = locations.toArray(new String[locations.size()]); + } + } + @Override + public long getLength() throws IOException, InterruptedException { + //TODO: We can obtain the file sizes of the snapshot here. + return 0; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return locations; + } + + // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of + // doing this wrapping with Writables. + @Override + public void write(DataOutput out) throws IOException { + MapReduceProtos.TableSnapshotRegionSplit.Builder builder = + MapReduceProtos.TableSnapshotRegionSplit.newBuilder() + .setRegion(RegionSpecifier.newBuilder() + .setType(RegionSpecifierType.ENCODED_REGION_NAME) + .setValue(ByteString.copyFrom(Bytes.toBytes(regionName))).build()); + + for (String location : locations) { + builder.addLocations(location); + } + + MapReduceProtos.TableSnapshotRegionSplit split = builder.build(); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + split.writeTo(baos); + baos.close(); + byte[] buf = baos.toByteArray(); + out.writeInt(buf.length); + out.write(buf); + } + @Override + public void readFields(DataInput in) throws IOException { + int len = in.readInt(); + byte[] buf = new byte[len]; + in.readFully(buf); + MapReduceProtos.TableSnapshotRegionSplit split = MapReduceProtos.TableSnapshotRegionSplit.PARSER.parseFrom(buf); + this.regionName = Bytes.toString(split.getRegion().getValue().toByteArray()); + List locationsList = split.getLocationsList(); + this.locations = locationsList.toArray(new String[locationsList.size()]); + } + } + + @VisibleForTesting + class TableSnapshotRegionRecordReader extends RecordReader { + private TableSnapshotRegionSplit split; + private Scan scan; + private Result result = null; + private ImmutableBytesWritable row = null; + private ClientSideRegionScanner scanner; + private TaskAttemptContext context; + private Method getCounter; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, + InterruptedException { + + Configuration conf = context.getConfiguration(); + this.split = (TableSnapshotRegionSplit) split; + String regionName = this.split.regionName; + String snapshotName = getSnapshotName(conf); + Path rootDir = new Path(conf.get(HConstants.HBASE_DIR)); + FileSystem fs = rootDir.getFileSystem(conf); + + Path tmpRootDir = new Path(conf.get(TABLE_DIR_KEY)); // This is the user specified root + // directory where snapshot was restored + + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); + + //load table descriptor + HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, snapshotDir); + + //load region descriptor + Path regionDir = new Path(snapshotDir, regionName); + HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir); + + // create scan + String scanStr = conf.get(TableInputFormat.SCAN); + if (scanStr == null) { + throw new IllegalArgumentException("A Scan is not configured for this job"); + } + scan = TableMapReduceUtil.convertStringToScan(scanStr); + scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); // region is immutable, this should be fine, + // otherwise we have to set the thread read point + + scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null); + if (context != null) { + this.context = context; + getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context); + } + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + result = scanner.next(); + if (result == null) { + //we are done + return false; + } + + if (this.row == null) { + this.row = new ImmutableBytesWritable(); + } + this.row.set(result.getRow()); + + ScanMetrics scanMetrics = scanner.getScanMetrics(); + if (scanMetrics != null && context != null) { + TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context); + } + + return true; + } + + @Override + public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { + return row; + } + + @Override + public Result getCurrentValue() throws IOException, InterruptedException { + return result; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return 0; // TODO: use total bytes to estimate + } + + @Override + public void close() throws IOException { + if (this.scanner != null) { + this.scanner.close(); + } + } + } + + @Override + public RecordReader createRecordReader( + InputSplit split, TaskAttemptContext context) throws IOException { + return new TableSnapshotRegionRecordReader(); + } + + @Override + public List getSplits(JobContext job) throws IOException, InterruptedException { + Configuration conf = job.getConfiguration(); + String snapshotName = getSnapshotName(conf); + + Path rootDir = new Path(conf.get(HConstants.HBASE_DIR)); + FileSystem fs = rootDir.getFileSystem(conf); + + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); + SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); + + Set snapshotRegionNames + = SnapshotReferenceUtil.getSnapshotRegionNames(fs, snapshotDir); + if (snapshotRegionNames == null) { + throw new IllegalArgumentException("Snapshot seems empty"); + } + + // load table descriptor + HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, + snapshotDir); + + Scan scan = TableMapReduceUtil.convertStringToScan(conf + .get(TableInputFormat.SCAN)); + Path tableDir = new Path(conf.get(TABLE_DIR_KEY)); + + List splits = new ArrayList(); + for (String regionName : snapshotRegionNames) { + // load region descriptor + Path regionDir = new Path(snapshotDir, regionName); + HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, + regionDir); + + if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), + hri.getStartKey(), hri.getEndKey())) { + // compute HDFS locations from snapshot files (which will get the locations for + // referred hfiles) + List hosts = getBestLocations(conf, + HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); + + int len = Math.min(3, hosts.size()); + hosts = hosts.subList(0, len); + splits.add(new TableSnapshotRegionSplit(regionName, hosts)); + } + } + + return splits; + } + + /** + * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take + * weights into account, thus will treat every location passed from the input split as equal. We + * do not want to blindly pass all the locations, since we are creating one split per region, and + * the region's blocks are all distributed throughout the cluster unless favorite node assignment + * is used. On the expected stable case, only one location will contain most of the blocks as local. + * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here + * we are doing a simple heuristic, where we will pass all hosts which have at least 80% + * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top + * host with the best locality. + */ + @VisibleForTesting + List getBestLocations(Configuration conf, HDFSBlocksDistribution blockDistribution) { + List locations = new ArrayList(3); + + HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights(); + + if (hostAndWeights.length == 0) { + return locations; + } + + HostAndWeight topHost = hostAndWeights[0]; + locations.add(topHost.getHost()); + + // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality + double cutoffMultiplier + = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER); + + double filterWeight = topHost.getWeight() * cutoffMultiplier; + + for (int i = 1; i < hostAndWeights.length; i++) { + if (hostAndWeights[i].getWeight() >= filterWeight) { + locations.add(hostAndWeights[i].getHost()); + } else { + break; + } + } + + return locations; + } + + public static void setInput(Job job, String snapshotName, Path restoreDir) throws IOException { + Configuration conf = job.getConfiguration(); + conf.set(SNAPSHOT_NAME_KEY, snapshotName); + + Path rootDir = new Path(conf.get(HConstants.HBASE_DIR)); + FileSystem fs = rootDir.getFileSystem(conf); + + // TODO: restore from record readers to parallelize. + RestoreSnapshotHelper.restoreSnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); + + conf.set(TABLE_DIR_KEY, restoreDir.toString()); + } + + private static String getSnapshotName(Configuration conf) { + String snapshotName = conf.get(SNAPSHOT_NAME_KEY); + if (snapshotName == null) { + throw new IllegalArgumentException("Snapshot name must be provided"); + } + return snapshotName; + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 878d694..3a1bacc 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -748,8 +748,23 @@ public class HRegion implements HeapSize { // , Writable{ */ public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf, final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException { - HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName()); + return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath); + } + + /** + * This is a helper function to compute HDFS block distribution on demand + * @param conf configuration + * @param tableDescriptor HTableDescriptor of the table + * @param regionInfo encoded name of the region + * @param tablePath the table directory + * @return The HDFS blocks distribution for the given region. + * @throws IOException + */ + public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf, + final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo, Path tablePath) + throws IOException { + HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); FileSystem fs = tablePath.getFileSystem(conf); HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo); @@ -3890,11 +3905,36 @@ public class HRegion implements HeapSize { // , Writable{ final HLog hlog, final boolean initialize, final boolean ignoreHLog) throws IOException { + Path tableDir = FSUtils.getTableDir(rootDir, info.getTable()); + return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, hlog, initialize, ignoreHLog); + } + + /** + * Convenience method creating new HRegions. Used by createTable. + * The {@link HLog} for the created region needs to be closed + * explicitly, if it is not null. + * Use {@link HRegion#getLog()} to get access. + * + * @param info Info for region to create. + * @param rootDir Root directory for HBase instance + * @param tableDir table directory + * @param conf + * @param hTableDescriptor + * @param hlog shared HLog + * @param initialize - true to initialize the region + * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable + * @return new HRegion + * @throws IOException + */ + public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir, + final Configuration conf, + final HTableDescriptor hTableDescriptor, + final HLog hlog, + final boolean initialize, final boolean ignoreHLog) + throws IOException { LOG.info("creating HRegion " + info.getTable().getNameAsString() + " HTD == " + hTableDescriptor + " RootDir = " + rootDir + " Table name == " + info.getTable().getNameAsString()); - - Path tableDir = FSUtils.getTableDir(rootDir, info.getTable()); FileSystem fs = FileSystem.get(conf); HRegionFileSystem rfs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info); HLog effectiveHLog = hlog; @@ -4050,15 +4090,39 @@ public class HRegion implements HeapSize { // , Writable{ final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal, final RegionServerServices rsServices, final CancelableProgressable reporter) throws IOException { + Path tableDir = FSUtils.getTableDir(rootDir, info.getTable()); + return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter); + } + + /** + * Open a Region. + * @param conf The Configuration object to use. + * @param fs Filesystem to use + * @param rootDir Root directory for HBase instance + * @param info Info for region to be opened. + * @param htd the table descriptor + * @param wal HLog for region to use. This method will call + * HLog#setSequenceNumber(long) passing the result of the call to + * HRegion#getMinSequenceId() to ensure the log id is properly kept + * up. HRegionStore does this every time it opens a new region. + * @param rsServices An interface we can request flushes against. + * @param reporter An interface we can report progress against. + * @return new HRegion + * @throws IOException + */ + public static HRegion openHRegion(final Configuration conf, final FileSystem fs, + final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal, + final RegionServerServices rsServices, final CancelableProgressable reporter) + throws IOException { if (info == null) throw new NullPointerException("Passed region info is null"); if (LOG.isDebugEnabled()) { LOG.debug("Opening region: " + info); } - Path dir = FSUtils.getTableDir(rootDir, info.getTable()); - HRegion r = HRegion.newHRegion(dir, wal, fs, conf, info, htd, rsServices); + HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices); return r.openHRegion(reporter); } + /** * Useful when reopening a closed region (normally for unit tests) * @param other original object diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java index 34c277a..09f9d48 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java @@ -18,8 +18,8 @@ package org.apache.hadoop.hbase.snapshot; -import java.io.InputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.util.Arrays; import java.util.HashMap; @@ -37,23 +37,24 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaEditor; -import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSVisitor; import org.apache.hadoop.hbase.util.ModifyRegionUtils; @@ -471,8 +472,9 @@ public class RestoreSnapshotHelper { } // create the regions on disk - ModifyRegionUtils.createRegions(conf, rootDir, + ModifyRegionUtils.createRegions(conf, rootDir, tableDir, tableDesc, clonedRegionsInfo, new ModifyRegionUtils.RegionFillTask() { + @Override public void fillRegion(final HRegion region) throws IOException { cloneRegion(region, snapshotRegions.get(region.getRegionInfo().getEncodedName())); } @@ -499,6 +501,7 @@ public class RestoreSnapshotHelper { final String tableName = tableDesc.getTableName().getNameAsString(); SnapshotReferenceUtil.visitRegionStoreFiles(fs, snapshotRegionDir, new FSVisitor.StoreFileVisitor() { + @Override public void storeFile (final String region, final String family, final String hfile) throws IOException { LOG.info("Adding HFileLink " + hfile + " to table=" + tableName); @@ -627,10 +630,13 @@ public class RestoreSnapshotHelper { private void restoreWALs() throws IOException { final SnapshotLogSplitter logSplitter = new SnapshotLogSplitter(conf, fs, tableDir, snapshotTable, regionsMap); + // TODO: use executors to parallelize splitting + // TODO: once split, we do not need to split again for other restores try { // Recover.Edits SnapshotReferenceUtil.visitRecoveredEdits(fs, snapshotDir, new FSVisitor.RecoveredEditsVisitor() { + @Override public void recoveredEdits (final String region, final String logfile) throws IOException { Path path = SnapshotReferenceUtil.getRecoveredEdits(snapshotDir, region, logfile); logSplitter.splitRecoveredEdit(path); @@ -639,6 +645,7 @@ public class RestoreSnapshotHelper { // Region Server Logs SnapshotReferenceUtil.visitLogFiles(fs, snapshotDir, new FSVisitor.LogFileVisitor() { + @Override public void logFile (final String server, final String logfile) throws IOException { logSplitter.splitLog(server, logfile); } @@ -689,4 +696,45 @@ public class RestoreSnapshotHelper { } return htd; } + + /** + * Restore the snapshot for a snapshot scanner, discards meta changes. + * @param conf + * @param fs + * @param rootDir + * @param restoreDir + * @param snapshotName + * @throws IOException + */ + public static void restoreSnapshotForScanner(Configuration conf, FileSystem fs, Path rootDir, + Path restoreDir, String snapshotName) throws IOException { + // ensure that restore dir is not under root dir + if (!restoreDir.getFileSystem(conf).getUri().equals(rootDir.getFileSystem(conf).getUri())) { + throw new IllegalArgumentException("Filesystems for restore directory and HBase root directory " + + "should be the same"); + } + if (restoreDir.toUri().getPath().startsWith(rootDir.toUri().getPath())) { + throw new IllegalArgumentException("Restore directory cannot be a sub directory of HBase " + + "root directory. RootDir: " + rootDir + ", restoreDir: " + restoreDir); + } + + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); + SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); + + //load table descriptor + HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, snapshotDir); + + MonitoredTask status = TaskMonitor.get().createStatus( + "Restoring snapshot '" + snapshotName + "' to directory " + restoreDir); + ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(); + + RestoreSnapshotHelper helper = new RestoreSnapshotHelper(conf, fs, snapshotDesc, + snapshotDir, htd, restoreDir, monitor, status); + helper.restoreHdfsRegions(); // TODO: parallelize. + + if (LOG.isDebugEnabled()) { + LOG.debug("Restored table dir:" + restoreDir); + FSUtils.logFileSystemState(fs, restoreDir, LOG); + } + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java index 28b4250..b0fd3a5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java @@ -54,7 +54,7 @@ public abstract class AbstractHBaseTool implements Tool { protected Configuration conf = null; private static final Set requiredOptions = new TreeSet(); - + protected String[] cmdLineArgs = null; /** @@ -151,6 +151,11 @@ public abstract class AbstractHBaseTool implements Tool { addOptWithArg(opt, description); } + protected void addRequiredOptWithArg(String shortOpt, String longOpt, String description) { + requiredOptions.add(longOpt); + addOptWithArg(shortOpt, longOpt, description); + } + protected void addOptNoArg(String opt, String description) { options.addOption(opt, false, description); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java index 9758946..369e707 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java @@ -84,6 +84,26 @@ public abstract class ModifyRegionUtils { public static List createRegions(final Configuration conf, final Path rootDir, final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions, final RegionFillTask task) throws IOException { + + Path tableDir = FSUtils.getTableDir(rootDir, hTableDescriptor.getTableName()); + return createRegions(conf, rootDir, tableDir, hTableDescriptor, newRegions, task); + } + + /** + * Create new set of regions on the specified file-system. + * NOTE: that you should add the regions to hbase:meta after this operation. + * + * @param conf {@link Configuration} + * @param rootDir Root directory for HBase instance + * @param tableDir table directory + * @param hTableDescriptor description of the table + * @param newRegions {@link HRegionInfo} that describes the regions to create + * @param task {@link RegionFillTask} custom code to populate region after creation + * @throws IOException + */ + public static List createRegions(final Configuration conf, final Path rootDir, + final Path tableDir, final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions, + final RegionFillTask task) throws IOException { if (newRegions == null) return null; int regionNumber = newRegions.length; ThreadPoolExecutor regionOpenAndInitThreadPool = getRegionOpenAndInitThreadPool(conf, @@ -93,21 +113,9 @@ public abstract class ModifyRegionUtils { List regionInfos = new ArrayList(); for (final HRegionInfo newRegion : newRegions) { completionService.submit(new Callable() { + @Override public HRegionInfo call() throws IOException { - // 1. Create HRegion - HRegion region = HRegion.createHRegion(newRegion, - rootDir, conf, hTableDescriptor, null, - false, true); - try { - // 2. Custom user code to interact with the created region - if (task != null) { - task.fillRegion(region); - } - } finally { - // 3. Close the new region to flush to disk. Close log file too. - region.close(); - } - return region.getRegionInfo(); + return createRegion(conf, rootDir, tableDir, hTableDescriptor, newRegion, task); } }); } @@ -129,6 +137,35 @@ public abstract class ModifyRegionUtils { return regionInfos; } + /** + * Create new set of regions on the specified file-system. + * @param conf {@link Configuration} + * @param rootDir Root directory for HBase instance + * @param tableDir table directory + * @param hTableDescriptor description of the table + * @param newRegion {@link HRegionInfo} that describes the region to create + * @param task {@link RegionFillTask} custom code to populate region after creation + * @throws IOException + */ + public static HRegionInfo createRegion(final Configuration conf, final Path rootDir, + final Path tableDir, final HTableDescriptor hTableDescriptor, final HRegionInfo newRegion, + final RegionFillTask task) throws IOException { + // 1. Create HRegion + HRegion region = HRegion.createHRegion(newRegion, + rootDir, tableDir, conf, hTableDescriptor, null, + false, true); + try { + // 2. Custom user code to interact with the created region + if (task != null) { + task.fillRegion(region); + } + } finally { + // 3. Close the new region to flush to disk. Close log file too. + region.close(); + } + return region.getRegionInfo(); + } + /* * used by createRegions() to get the thread pool executor based on the * "hbase.hregion.open.and.init.threads.max" property. @@ -142,6 +179,7 @@ public abstract class ModifyRegionUtils { new ThreadFactory() { private int count = 1; + @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, threadNamePrefix + "-" + count++); return t; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index d692adc..737bb8c 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -163,6 +163,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * mini dfs. * @deprecated can be used only with mini dfs */ + @Deprecated private static final String TEST_DIRECTORY_KEY = "test.build.data"; /** Filesystem URI used for map-reduce mini-cluster setup */ @@ -1635,24 +1636,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @throws IOException */ public int loadTable(final HTable t, final byte[] f) throws IOException { - t.setAutoFlush(false, true); - byte[] k = new byte[3]; - int rowCount = 0; - for (byte b1 = 'a'; b1 <= 'z'; b1++) { - for (byte b2 = 'a'; b2 <= 'z'; b2++) { - for (byte b3 = 'a'; b3 <= 'z'; b3++) { - k[0] = b1; - k[1] = b2; - k[2] = b3; - Put put = new Put(k); - put.add(f, null, k); - t.put(put); - rowCount++; - } - } - } - t.flushCommits(); - return rowCount; + return loadTable(t, new byte[][] {f}); } /** @@ -1663,28 +1647,83 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @throws IOException */ public int loadTable(final HTable t, final byte[][] f) throws IOException { - t.setAutoFlush(false, true); - byte[] k = new byte[3]; + return loadTable(t, f, null); + } + + /** + * Load table of multiple column families with rows from 'aaa' to 'zzz'. + * @param t Table + * @param f Array of Families to load + * @param value the values of the cells. If null is passed, the row key is used as value + * @return Count of rows loaded. + * @throws IOException + */ + public int loadTable(final HTable t, final byte[][] f, byte[] value) throws IOException { + t.setAutoFlush(false); int rowCount = 0; - for (byte b1 = 'a'; b1 <= 'z'; b1++) { - for (byte b2 = 'a'; b2 <= 'z'; b2++) { - for (byte b3 = 'a'; b3 <= 'z'; b3++) { - k[0] = b1; - k[1] = b2; - k[2] = b3; - Put put = new Put(k); - for (int i = 0; i < f.length; i++) { - put.add(f[i], null, k); - } - t.put(put); - rowCount++; - } + for (byte[] row : HBaseTestingUtility.ROWS) { + Put put = new Put(row); + for (int i = 0; i < f.length; i++) { + put.add(f[i], null, value != null ? value : row); } + t.put(put); + rowCount++; } t.flushCommits(); return rowCount; } + /** A tracker for tracking and validating table rows + * generated with {@link HBaseTestingUtility#loadTable(HTable, byte[])} + */ + public static class SeenRowTracker { + int dim = 'z' - 'a' + 1; + int[][][] seenRows = new int[dim][dim][dim]; //count of how many times the row is seen + byte[] startRow; + byte[] stopRow; + + public SeenRowTracker(byte[] startRow, byte[] stopRow) { + this.startRow = startRow; + this.stopRow = stopRow; + } + + void reset() { + for (byte[] row : ROWS) { + seenRows[i(row[0])][i(row[1])][i(row[2])] = 0; + } + } + + int i(byte b) { + return b - 'a'; + } + + public void addRow(byte[] row) { + seenRows[i(row[0])][i(row[1])][i(row[2])]++; + } + + /** Validate that all the rows between startRow and stopRow are seen exactly once, and + * all other rows none + */ + public void validate() { + for (byte b1 = 'a'; b1 <= 'z'; b1++) { + for (byte b2 = 'a'; b2 <= 'z'; b2++) { + for (byte b3 = 'a'; b3 <= 'z'; b3++) { + int count = seenRows[i(b1)][i(b2)][i(b3)]; + int expectedCount = 0; + if (Bytes.compareTo(new byte[] {b1,b2,b3}, startRow) >= 0 + && Bytes.compareTo(new byte[] {b1,b2,b3}, stopRow) < 0) { + expectedCount = 1; + } + if (count != expectedCount) { + String row = new String(new byte[] {b1,b2,b3}); + throw new RuntimeException("Row:" + row + " has a seen count of " + count + " instead of " + expectedCount); + } + } + } + } + } + } + public int loadRegion(final HRegion r, final byte[] f) throws IOException { return loadRegion(r, f, false); } @@ -1796,6 +1835,22 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { return createMultiRegions(getConfiguration(), table, columnFamily); } + /** All the row values for the data loaded by {@link #loadTable(HTable, byte[])} */ + public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB + static { + int i = 0; + for (byte b1 = 'a'; b1 <= 'z'; b1++) { + for (byte b2 = 'a'; b2 <= 'z'; b2++) { + for (byte b3 = 'a'; b3 <= 'z'; b3++) { + ROWS[i][0] = b1; + ROWS[i][1] = b2; + ROWS[i][2] = b3; + i++; + } + } + } + } + public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"), Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), @@ -3229,6 +3284,18 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } /** + * Returns a {@link Predicate} for checking that table is enabled + */ + public Waiter.Predicate predicateTableEnabled(final TableName tableName) { + return new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return getHBaseAdmin().isTableEnabled(tableName); + } + }; + } + + /** * Create a set of column descriptors with the combination of compression, * encoding, bloom codecs available. * @return the list of column descriptors diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluationScan.java hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluationScan.java new file mode 100644 index 0000000..a260dab --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluationScan.java @@ -0,0 +1,393 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.cli.CommandLine; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TableSnapshotScanner; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.mapreduce.TableMapper; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.util.AbstractHBaseTool; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.ToolRunner; + +import com.google.common.base.Stopwatch; + +/** + * A simple performance evaluation tool for single client and MR scans + * and snapshot scans. + */ +public class PerformanceEvaluationScan extends AbstractHBaseTool { + + private static final String HBASE_COUNTER_GROUP_NAME = "HBase Counters"; + + private String type; + private String file; + private String tablename; + private String snapshotName; + private String restoreDir; + private String caching; + + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + Path rootDir; + try { + rootDir = FSUtils.getRootDir(conf); + rootDir.getFileSystem(conf); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + @Override + protected void addOptions() { + this.addRequiredOptWithArg("t", "type", "the type of the test. One of the following: streaming|scan|snapshotscan|scanmapreduce|snapshotscanmapreduce"); + this.addOptWithArg("f", "file", "the filename to read from"); + this.addOptWithArg("tn", "table", "the tablename to read from"); + this.addOptWithArg("sn", "snapshot", "the snapshot name to read from"); + this.addOptWithArg("rs", "restoredir", "the directory to restore the snapshot"); + this.addOptWithArg("ch", "caching", "scanner caching value"); + } + + @Override + protected void processOptions(CommandLine cmd) { + type = cmd.getOptionValue("type"); + file = cmd.getOptionValue("file"); + tablename = cmd.getOptionValue("table"); + snapshotName = cmd.getOptionValue("snapshot"); + restoreDir = cmd.getOptionValue("restoredir"); + caching = cmd.getOptionValue("caching"); + } + + protected void testHdfsStreaming(Path filename) throws IOException { + byte[] buf = new byte[1024]; + FileSystem fs = filename.getFileSystem(getConf()); + + // read the file from start to finish + Stopwatch fileOpenTimer = new Stopwatch(); + Stopwatch streamTimer = new Stopwatch(); + + fileOpenTimer.start(); + FSDataInputStream in = fs.open(filename); + fileOpenTimer.stop(); + + long totalBytes = 0; + streamTimer.start(); + while (true) { + int read = in.read(buf); + if (read < 0) { + break; + } + totalBytes += read; + } + streamTimer.stop(); + + double throughput = (double)totalBytes / streamTimer.elapsedTime(TimeUnit.SECONDS); + + System.out.println("HDFS streaming: "); + System.out.println("total time to open: " + fileOpenTimer.elapsedMillis() + " ms"); + System.out.println("total time to read: " + streamTimer.elapsedMillis() + " ms"); + System.out.println("total bytes: " + totalBytes + " bytes (" + + StringUtils.humanReadableInt(totalBytes) + ")"); + System.out.println("throghput : " + StringUtils.humanReadableInt((long)throughput) + "B/s"); + } + + private Scan getScan() { + Scan scan = new Scan(); // default scan settings + scan.setCacheBlocks(false); + scan.setMaxVersions(1); + scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE)); + if (caching != null) { + scan.setCaching(Integer.parseInt(caching)); + } + + return scan; + } + + public void testScan() throws IOException { + Stopwatch tableOpenTimer = new Stopwatch(); + Stopwatch scanOpenTimer = new Stopwatch(); + Stopwatch scanTimer = new Stopwatch(); + + tableOpenTimer.start(); + HTable table = new HTable(getConf(), TableName.valueOf(tablename)); + tableOpenTimer.stop(); + + Scan scan = getScan(); + scanOpenTimer.start(); + ResultScanner scanner = table.getScanner(scan); + scanOpenTimer.stop(); + + long numRows = 0; + long numCells = 0; + scanTimer.start(); + while (true) { + Result result = scanner.next(); + if (result == null) { + break; + } + numRows++; + + numCells += result.rawCells().length; + } + scanTimer.stop(); + scanner.close(); + table.close(); + + ScanMetrics metrics = ProtobufUtil.toScanMetrics(scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA)); + long totalBytes = metrics.countOfBytesInResults.get(); + double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS); + double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS); + double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS); + + System.out.println("HBase scan: "); + System.out.println("total time to open table: " + tableOpenTimer.elapsedMillis() + " ms"); + System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms"); + System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms"); + + System.out.println("Scan metrics:\n" + metrics.getMetricsMap()); + + System.out.println("total bytes: " + totalBytes + " bytes (" + + StringUtils.humanReadableInt(totalBytes) + ")"); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s"); + System.out.println("total rows : " + numRows); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s"); + System.out.println("total cells : " + numCells); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s"); + } + + + public void testSnapshotScan() throws IOException { + Stopwatch snapshotRestoreTimer = new Stopwatch(); + Stopwatch scanOpenTimer = new Stopwatch(); + Stopwatch scanTimer = new Stopwatch(); + + Path restoreDir = new Path(this.restoreDir); + + snapshotRestoreTimer.start(); + restoreDir.getFileSystem(conf).delete(restoreDir, true); + snapshotRestoreTimer.stop(); + + Scan scan = getScan(); + scanOpenTimer.start(); + TableSnapshotScanner scanner = new TableSnapshotScanner(conf, restoreDir, snapshotName, scan); + scanOpenTimer.stop(); + + long numRows = 0; + long numCells = 0; + scanTimer.start(); + while (true) { + Result result = scanner.next(); + if (result == null) { + break; + } + numRows++; + + numCells += result.rawCells().length; + } + scanTimer.stop(); + scanner.close(); + + ScanMetrics metrics = scanner.getScanMetrics(); + long totalBytes = metrics.countOfBytesInResults.get(); + double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS); + double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS); + double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS); + + System.out.println("HBase scan snapshot: "); + System.out.println("total time to restore snapshot: " + snapshotRestoreTimer.elapsedMillis() + " ms"); + System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms"); + System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms"); + + System.out.println("Scan metrics:\n" + metrics.getMetricsMap()); + + System.out.println("total bytes: " + totalBytes + " bytes (" + + StringUtils.humanReadableInt(totalBytes) + ")"); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s"); + System.out.println("total rows : " + numRows); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s"); + System.out.println("total cells : " + numCells); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s"); + + } + + public static enum ScanCounter { + NUM_ROWS, + NUM_CELLS, + } + + public static class MyMapper extends TableMapper { + @Override + protected void map(ImmutableBytesWritable key, Result value, + Context context) throws IOException, + InterruptedException { + context.getCounter(ScanCounter.NUM_ROWS).increment(1); + context.getCounter(ScanCounter.NUM_CELLS).increment(value.rawCells().length); + } + } + + public void testScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException { + Stopwatch scanOpenTimer = new Stopwatch(); + Stopwatch scanTimer = new Stopwatch(); + + Scan scan = getScan(); + + String jobName = "testScanMapReduce"; + + Job job = new Job(conf); + job.setJobName(jobName); + + job.setJarByClass(getClass()); + + TableMapReduceUtil.initTableMapperJob( + this.tablename, + scan, + MyMapper.class, + NullWritable.class, + NullWritable.class, + job + ); + + job.setNumReduceTasks(0); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(NullWritable.class); + job.setOutputFormatClass(NullOutputFormat.class); + + scanTimer.start(); + job.waitForCompletion(true); + scanTimer.stop(); + + Counters counters = job.getCounters(); + long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue(); + long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue(); + + long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue(); + double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS); + double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS); + double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS); + + System.out.println("HBase scan mapreduce: "); + System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms"); + System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms"); + + System.out.println("total bytes: " + totalBytes + " bytes (" + + StringUtils.humanReadableInt(totalBytes) + ")"); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s"); + System.out.println("total rows : " + numRows); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s"); + System.out.println("total cells : " + numCells); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s"); + } + + public void testSnapshotScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException { + Stopwatch scanOpenTimer = new Stopwatch(); + Stopwatch scanTimer = new Stopwatch(); + + Scan scan = getScan(); + + String jobName = "testSnapshotScanMapReduce"; + + Job job = new Job(conf); + job.setJobName(jobName); + + job.setJarByClass(getClass()); + + TableMapReduceUtil.initTableSnapshotMapperJob( + this.snapshotName, + scan, + MyMapper.class, + NullWritable.class, + NullWritable.class, + job, + true, + new Path(restoreDir) + ); + + job.setNumReduceTasks(0); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(NullWritable.class); + job.setOutputFormatClass(NullOutputFormat.class); + + scanTimer.start(); + job.waitForCompletion(true); + scanTimer.stop(); + + Counters counters = job.getCounters(); + long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue(); + long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue(); + + long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue(); + double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS); + double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS); + double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS); + + System.out.println("HBase scan mapreduce: "); + System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms"); + System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms"); + + System.out.println("total bytes: " + totalBytes + " bytes (" + + StringUtils.humanReadableInt(totalBytes) + ")"); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s"); + System.out.println("total rows : " + numRows); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s"); + System.out.println("total cells : " + numCells); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s"); + } + + @Override + protected int doWork() throws Exception { + if (type.equals("streaming")) { + testHdfsStreaming(new Path(file)); + } else if (type.equals("scan")){ + testScan(); + } else if (type.equals("snapshotscan")) { + testSnapshotScan(); + } else if (type.equals("scanmapreduce")) { + testScanMapReduce(); + } else if (type.equals("snapshotscanmapreduce")) { + testSnapshotScanMapReduce(); + } + return 0; + } + + public static void main (String[] args) throws Exception { + int ret = ToolRunner.run(HBaseConfiguration.create(), new PerformanceEvaluationScan(), args); + System.exit(ret); + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java new file mode 100644 index 0000000..34edae2 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.mapreduce.TestTableSnapshotInputFormat; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestTableSnapshotScanner { + + private static final Log LOG = LogFactory.getLog(TestTableSnapshotInputFormat.class); + private final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final int NUM_REGION_SERVERS = 2; + private static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")}; + public static byte[] bbb = Bytes.toBytes("bbb"); + public static byte[] yyy = Bytes.toBytes("yyy"); + + private FileSystem fs; + private Path rootDir; + + public void setupCluster() throws Exception { + setupConf(UTIL.getConfiguration()); + UTIL.startMiniCluster(NUM_REGION_SERVERS); + rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); + fs = rootDir.getFileSystem(UTIL.getConfiguration()); + } + + public void tearDownCluster() throws Exception { + UTIL.shutdownMiniCluster(); + } + + private static void setupConf(Configuration conf) { + // Enable snapshot + conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); + } + + @After + public void tearDown() throws Exception { + } + + public static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName, + String snapshotName, int numRegions) + throws Exception { + try { + util.deleteTable(tableName); + } catch(Exception ex) { + // ignore + } + + if (numRegions > 1) { + util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions); + } else { + util.createTable(tableName, FAMILIES); + } + HBaseAdmin admin = util.getHBaseAdmin(); + + // put some stuff in the table + HTable table = new HTable(util.getConfiguration(), tableName); + util.loadTable(table, FAMILIES); + + Path rootDir = new Path(util.getConfiguration().get(HConstants.HBASE_DIR)); + FileSystem fs = rootDir.getFileSystem(util.getConfiguration()); + + SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, + Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true); + + // load different values + byte[] value = Bytes.toBytes("after_snapshot_value"); + util.loadTable(table, FAMILIES, value); + + // cause flush to create new files in the region + admin.flush(tableName.toString()); + table.close(); + } + + @Test + public void testWithSingleRegion() throws Exception { + testScanner(UTIL, "testWithSingleRegion", 1, false); + } + + @Test + public void testWithMultiRegion() throws Exception { + testScanner(UTIL, "testWithMultiRegion", 10, false); + } + + @Test + public void testWithOfflineHBaseMultiRegion() throws Exception { + testScanner(UTIL, "testWithMultiRegion", 20, true); + } + + private void testScanner(HBaseTestingUtility util, String snapshotName, int numRegions, boolean shutdownCluster) + throws Exception { + setupCluster(); + TableName tableName = TableName.valueOf("testScanner"); + try { + createTableAndSnapshot(util, tableName, snapshotName, numRegions); + + if (shutdownCluster) { + util.shutdownMiniHBaseCluster(); + } + + Path restoreDir = util.getDataTestDirOnTestFS(snapshotName); + Scan scan = new Scan(bbb, yyy); // limit the scan + + TableSnapshotScanner scanner = new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan); + + verifyScanner(scanner, bbb, yyy); + scanner.close(); + } finally { + if (!shutdownCluster) { + util.getHBaseAdmin().deleteSnapshot(snapshotName); + util.deleteTable(tableName); + tearDownCluster(); + } + } + } + + private void verifyScanner(ResultScanner scanner, byte[] startRow, byte[] stopRow) + throws IOException, InterruptedException { + + HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); + + while (true) { + Result result = scanner.next(); + if (result == null) { + break; + } + verifyRow(result); + rowTracker.addRow(result.getRow()); + } + + // validate all rows are seen + rowTracker.validate(); + } + + private static void verifyRow(Result result) throws IOException { + byte[] row = result.getRow(); + CellScanner scanner = result.cellScanner(); + while (scanner.advance()) { + Cell cell = scanner.current(); + + //assert that all Cells in the Result have the same key + Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length, + cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); + } + + for (int j = 0; j < FAMILIES.length; j++) { + byte[] actual = result.getValue(FAMILIES[j], null); + Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row) + + " ,actual:" + Bytes.toString(actual), row, actual); + } + } + +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java new file mode 100644 index 0000000..17e23a0 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + +@Category(LargeTests.class) +public class TestTableSnapshotInputFormat { + + private static final Log LOG = LogFactory.getLog(TestTableSnapshotInputFormat.class); + private final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final int NUM_REGION_SERVERS = 2; + private static final String TABLE_NAME_STR = "TestTableSnapshotInputFormat"; + private static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")}; + private static final TableName TABLE_NAME = TableName.valueOf(TABLE_NAME_STR); + public static byte[] bbb = Bytes.toBytes("bbb"); + public static byte[] yyy = Bytes.toBytes("yyy"); + + private FileSystem fs; + private Path rootDir; + + public void setupCluster() throws Exception { + setupConf(UTIL.getConfiguration()); + UTIL.startMiniCluster(NUM_REGION_SERVERS); + rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); + fs = rootDir.getFileSystem(UTIL.getConfiguration()); + } + + public void tearDownCluster() throws Exception { + UTIL.shutdownMiniCluster(); + } + + private static void setupConf(Configuration conf) { + // Enable snapshot + conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testGetBestLocations() throws IOException { + TableSnapshotInputFormat tsif = new TableSnapshotInputFormat(); + Configuration conf = UTIL.getConfiguration(); + + HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution(); + Assert.assertEquals(Lists.newArrayList(), tsif.getBestLocations(conf, blockDistribution)); + + blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1); + Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution)); + + blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1); + Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution)); + + blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 1); + Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution)); + + blockDistribution = new HDFSBlocksDistribution(); + blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 10); + blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 7); + blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 5); + blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 1); + Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution)); + + blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 2); + Assert.assertEquals(Lists.newArrayList("h1", "h2"), tsif.getBestLocations(conf, blockDistribution)); + + blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 3); + Assert.assertEquals(Lists.newArrayList("h2", "h1"), tsif.getBestLocations(conf, blockDistribution)); + + blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 6); + blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 9); + + Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4", "h1"), tsif.getBestLocations(conf, blockDistribution)); + } + + public static enum TestTableSnapshotCounters { + VALIDATION_ERROR + } + + public static class TestTableSnapshotMapper + extends TableMapper { + @Override + protected void map(ImmutableBytesWritable key, Result value, + Context context) throws IOException, InterruptedException { + // Validate a single row coming from the snapshot, and emit the row key + verifyRowFromMap(key, value); + context.write(key, NullWritable.get()); + } + } + + public static class TestTableSnapshotReducer + extends Reducer { + HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(bbb, yyy); + @Override + protected void reduce(ImmutableBytesWritable key, Iterable values, + Context context) throws IOException, InterruptedException { + rowTracker.addRow(key.get()); + } + + @Override + protected void cleanup(Context context) throws IOException, + InterruptedException { + rowTracker.validate(); + } + } + + public static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName, + String snapshotName, int numRegions) + throws Exception { + try { + util.deleteTable(tableName); + } catch(Exception ex) { + // ignore + } + + if (numRegions > 1) { + util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions); + } else { + util.createTable(tableName, FAMILIES); + } + HBaseAdmin admin = util.getHBaseAdmin(); + + // put some stuff in the table + HTable table = new HTable(util.getConfiguration(), tableName); + util.loadTable(table, FAMILIES); + + Path rootDir = new Path(util.getConfiguration().get(HConstants.HBASE_DIR)); + FileSystem fs = rootDir.getFileSystem(util.getConfiguration()); + + SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, + Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true); + + // load different values + byte[] value = Bytes.toBytes("after_snapshot_value"); + util.loadTable(table, FAMILIES, value); + + // cause flush to create new files in the region + admin.flush(tableName.toString()); + table.close(); + } + + @Test + public void testWithMockedMapReduceSingleRegion() throws Exception { + testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1); + } + + @Test + public void testWithMockedMapReduceMultiRegion() throws Exception { + testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 8); + } + + public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, int numRegions, int expectedNumSplits) + throws Exception { + setupCluster(); + TableName tableName = TableName.valueOf("testWithMockedMapReduce"); + try { + createTableAndSnapshot(util, tableName, snapshotName, numRegions); + + Job job = new Job(util.getConfiguration()); + Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName); + Scan scan = new Scan(bbb, yyy); // limit the scan + + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, + scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, job, false, tmpTableDir); + + verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, bbb, yyy); + + } finally { + util.getHBaseAdmin().deleteSnapshot(snapshotName); + util.deleteTable(tableName); + tearDownCluster(); + } + } + + private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits, + byte[] startRow, byte[] stopRow) + throws IOException, InterruptedException { + TableSnapshotInputFormat tsif = new TableSnapshotInputFormat(); + List splits = tsif.getSplits(job); + + Assert.assertEquals(expectedNumSplits, splits.size()); + + HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); + + for (int i = 0; i < splits.size(); i++) { + // validate input split + InputSplit split = splits.get(i); + Assert.assertTrue(split instanceof TableSnapshotRegionSplit); + + // validate record reader + TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class); + when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration()); + RecordReader rr = tsif.createRecordReader(split, taskAttemptContext); + rr.initialize(split, taskAttemptContext); + + // validate we can read all the data back + while (rr.nextKeyValue()) { + byte[] row = rr.getCurrentKey().get(); + verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue()); + rowTracker.addRow(row); + } + + rr.close(); + } + + // validate all rows are seen + rowTracker.validate(); + } + + public static void verifyRowFromMap(ImmutableBytesWritable key, Result result) throws IOException { + byte[] row = key.get(); + CellScanner scanner = result.cellScanner(); + while (scanner.advance()) { + Cell cell = scanner.current(); + + //assert that all Cells in the Result have the same key + Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length, + cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); + } + + for (int j = 0; j < FAMILIES.length; j++) { + byte[] actual = result.getValue(FAMILIES[j], null); + Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row) + + " ,actual:" + Bytes.toString(actual), row, actual); + } + } + + @Test + public void testWithMapReduceSingleRegion() throws Exception { + testWithMapReduce(UTIL, "testWithMapReduceSingleRegion", 1, 1, false); + } + + @Test + public void testWithMapReduceMultiRegion() throws Exception { + testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 8, false); + } + + @Test + // run the MR job while HBase is offline + public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception { + testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 8, true); + } + + private void testWithMapReduce(HBaseTestingUtility util, String snapshotName, + int numRegions, int expectedNumSplits, boolean shutdownCluster) throws Exception { + setupCluster(); + util.startMiniMapReduceCluster(); + try { + Path tableDir = util.getDataTestDirOnTestFS(snapshotName); + TableName tableName = TableName.valueOf("testWithMapReduce"); + doTestWithMapReduce(util, tableName, snapshotName, tableDir, numRegions, + expectedNumSplits, shutdownCluster); + } finally { + util.shutdownMiniMapReduceCluster(); + tearDownCluster(); + } + } + + // this is also called by the IntegrationTestTableSnapshotInputFormat + public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName, + String snapshotName, Path tableDir, int numRegions, int expectedNumSplits, boolean shutdownCluster) + throws Exception { + + //create the table and snapshot + createTableAndSnapshot(util, tableName, snapshotName, numRegions); + + if (shutdownCluster) { + util.shutdownMiniHBaseCluster(); + } + + try { + // create the job + Job job = new Job(util.getConfiguration()); + Scan scan = new Scan(bbb, yyy); // limit the scan + + job.setJarByClass(util.getClass()); + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), TestTableSnapshotInputFormat.class); + + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, + scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, job, true, tableDir); + + job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class); + job.setNumReduceTasks(1); + job.setOutputFormatClass(NullOutputFormat.class); + + Assert.assertTrue(job.waitForCompletion(true)); + } finally { + if (!shutdownCluster) { + util.getHBaseAdmin().deleteSnapshot(snapshotName); + util.deleteTable(tableName); + } + } + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement.java index b8ed519..7c76d3e 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement.java @@ -664,7 +664,7 @@ public class TestRegionPlacement { /** * Create a table with specified table name and region number. - * @param table + * @param tablename * @param regionNum * @return * @throws IOException