diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
index 16de7c9..beaf5ee 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
@@ -18,9 +18,12 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.util.Bytes;
/**
* Helper class for custom client scanners.
@@ -28,6 +31,49 @@ import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public abstract class AbstractClientScanner implements ResultScanner {
+ protected ScanMetrics scanMetrics;
+
+ /**
+ * Check and initialize if application wants to collect scan metrics
+ */
+ protected void initScanMetrics(Scan scan) {
+ // check if application wants to collect scan metrics
+ byte[] enableMetrics = scan.getAttribute(
+ Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
+ if (enableMetrics != null && Bytes.toBoolean(enableMetrics)) {
+ scanMetrics = new ScanMetrics();
+ }
+ }
+
+ // TODO: should this be at ResultScanner? ScanMetrics is not public API it seems.
+ public ScanMetrics getScanMetrics() {
+ return scanMetrics;
+ }
+
+ /**
+ * Get nbRows rows.
+ * How many RPCs are made is determined by the {@link Scan#setCaching(int)}
+ * setting (or hbase.client.scanner.caching in hbase-site.xml).
+ * @param nbRows number of rows to return
+ * @return Between zero and nbRows RowResults. Scan is done
+ * if returned array is of zero-length (We never return null).
+ * @throws IOException
+ */
+ @Override
+ public Result [] next(int nbRows) throws IOException {
+ // Collect values to be returned here
+ ArrayList resultSets = new ArrayList(nbRows);
+ for(int i = 0; i < nbRows; i++) {
+ Result next = next();
+ if (next != null) {
+ resultSets.add(next);
+ } else {
+ break;
+ }
+ }
+ return resultSets.toArray(new Result[resultSets.size()]);
+ }
+
@Override
public Iterator iterator() {
return new Iterator() {
@@ -38,6 +84,7 @@ public abstract class AbstractClientScanner implements ResultScanner {
// this method is where the actual advancing takes place, but you need
// to call next() to consume it. hasNext() will only advance if there
// isn't a pending next().
+ @Override
public boolean hasNext() {
if (next == null) {
try {
@@ -52,6 +99,7 @@ public abstract class AbstractClientScanner implements ResultScanner {
// get the pending next item and advance the iterator. returns null if
// there is no next item.
+ @Override
public Result next() {
// since hasNext() does the real advancing, we call this to determine
// if there is a next before proceeding.
@@ -67,6 +115,7 @@ public abstract class AbstractClientScanner implements ResultScanner {
return temp;
}
+ @Override
public void remove() {
throw new UnsupportedOperationException();
}
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index bc83170..c441c30 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.LinkedList;
import org.apache.commons.logging.Log;
@@ -35,7 +34,6 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
-import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
@@ -62,7 +60,6 @@ public class ClientScanner extends AbstractClientScanner {
protected long lastNext;
// Keep lastResult returned successfully in case we have to reset scanner.
protected Result lastResult = null;
- protected ScanMetrics scanMetrics = null;
protected final long maxScannerResultSize;
private final HConnection connection;
private final TableName tableName;
@@ -151,11 +148,7 @@ public class ClientScanner extends AbstractClientScanner {
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
// check if application wants to collect scan metrics
- byte[] enableMetrics = scan.getAttribute(
- Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
- if (enableMetrics != null && Bytes.toBoolean(enableMetrics)) {
- scanMetrics = new ScanMetrics();
- }
+ initScanMetrics(scan);
// Use the caching from the Scan. If not set, use the default cache setting for this table.
if (this.scan.getCaching() > 0) {
@@ -170,7 +163,7 @@ public class ClientScanner extends AbstractClientScanner {
initializeScannerInConstruction();
}
-
+
protected void initializeScannerInConstruction() throws IOException{
// initialize the scanner
nextScanner(this.caching, false);
@@ -429,30 +422,6 @@ public class ClientScanner extends AbstractClientScanner {
return null;
}
- /**
- * Get nbRows rows.
- * How many RPCs are made is determined by the {@link Scan#setCaching(int)}
- * setting (or hbase.client.scanner.caching in hbase-site.xml).
- * @param nbRows number of rows to return
- * @return Between zero and nbRows RowResults. Scan is done
- * if returned array is of zero-length (We never return null).
- * @throws IOException
- */
- @Override
- public Result [] next(int nbRows) throws IOException {
- // Collect values to be returned here
- ArrayList resultSets = new ArrayList(nbRows);
- for(int i = 0; i < nbRows; i++) {
- Result next = next();
- if (next != null) {
- resultSets.add(next);
- } else {
- break;
- }
- }
- return resultSets.toArray(new Result[resultSets.size()]);
- }
-
@Override
public void close() {
if (!scanMetricsPublished) writeScanMetrics();
diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index 426572f..9fa602b 100644
--- hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -82,7 +82,7 @@ public final class CellUtil {
copyValueTo(cell, output, 0);
return output;
}
-
+
public static byte[] getTagArray(Cell cell){
byte[] output = new byte[cell.getTagsLength()];
copyTagTo(cell, output, 0);
@@ -370,4 +370,16 @@ public final class CellUtil {
// Serialization is probably preceded by a length (it is in the KeyValueCodec at least).
Bytes.SIZEOF_INT;
}
+
+ /**
+ * Returns true if the first range start1...end1 overlaps with the second range
+ * start2...end2, assuming the byte arrays represent row keys
+ */
+ public static boolean overlappingKeys(final byte[] start1, final byte[] end1,
+ final byte[] start2, final byte[] end2) {
+ return (end2.length == 0 || start1.length == 0 || Bytes.compareTo(start1,
+ end2) < 0)
+ && (end1.length == 0 || start2.length == 0 || Bytes.compareTo(start2,
+ end1) < 0);
+ }
}
diff --git hbase-common/src/test/java/org/apache/hadoop/hbase/TestCellUtil.java hbase-common/src/test/java/org/apache/hadoop/hbase/TestCellUtil.java
new file mode 100644
index 0000000..d031ea1
--- /dev/null
+++ hbase-common/src/test/java/org/apache/hadoop/hbase/TestCellUtil.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestCellUtil {
+
+ @Test
+ public void testOverlappingKeys() {
+ byte[] empty = HConstants.EMPTY_BYTE_ARRAY;
+ byte[] a = Bytes.toBytes("a");
+ byte[] b = Bytes.toBytes("b");
+ byte[] c = Bytes.toBytes("c");
+ byte[] d = Bytes.toBytes("d");
+
+ Assert.assertTrue(CellUtil.overlappingKeys(a, b, a, b));
+ Assert.assertTrue(CellUtil.overlappingKeys(a, c, a, b));
+ Assert.assertTrue(CellUtil.overlappingKeys(a, b, a, c));
+ Assert.assertTrue(CellUtil.overlappingKeys(b, c, a, c));
+ Assert.assertTrue(CellUtil.overlappingKeys(a, c, b, c));
+ Assert.assertTrue(CellUtil.overlappingKeys(a, d, b, c));
+ Assert.assertTrue(CellUtil.overlappingKeys(b, c, a, d));
+
+ Assert.assertTrue(CellUtil.overlappingKeys(empty, b, a, b));
+ Assert.assertTrue(CellUtil.overlappingKeys(empty, b, a, c));
+
+ Assert.assertTrue(CellUtil.overlappingKeys(a, b, empty, b));
+ Assert.assertTrue(CellUtil.overlappingKeys(a, b, empty, c));
+
+ Assert.assertTrue(CellUtil.overlappingKeys(a, empty, a, b));
+ Assert.assertTrue(CellUtil.overlappingKeys(a, empty, a, c));
+
+ Assert.assertTrue(CellUtil.overlappingKeys(a, b, empty, empty));
+ Assert.assertTrue(CellUtil.overlappingKeys(empty, empty, a, b));
+ }
+}
diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java
new file mode 100644
index 0000000..4ff7a82
--- /dev/null
+++ hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.IntegrationTestBase;
+import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.hbase.IntegrationTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.experimental.categories.Category;
+
+/**
+ * An integration test to test {@link TableSnapshotInputFormat} which enables
+ * reading directly from snapshot files without going through hbase servers.
+ *
+ * This test creates a table and loads the table with the rows ranging from
+ * 'aaa' to 'zzz', and for each row, sets the columns f1:(null) and f2:(null) to be
+ * the the same as the row value.
+ *
+ *
+ * Then the test creates a snapshot from this table, and overrides the values in the original
+ * table with values 'after_snapshot_value'. The test, then runs a mapreduce job over the snapshot
+ * with a scan start row 'bbb' and stop row 'yyy'. The data is saved in a single reduce output file, and
+ * inspected later to verify that the MR job has seen all the values from the snapshot.
+ *
+ *
These parameters can be used to configure the job:
+ * "IntegrationTestTableSnapshotInputFormat.table" => the name of the table
+ * "IntegrationTestTableSnapshotInputFormat.snapshot" => the name of the snapshot
+ * "IntegrationTestTableSnapshotInputFormat.numRegions" => number of regions in the table to be created
+ * "IntegrationTestTableSnapshotInputFormat.tableDir" => temporary directory to restore the snapshot files
+ *
+ */
+@Category(IntegrationTests.class)
+// Not runnable as a unit test. See TestTableSnapshotInputFormat
+public class IntegrationTestTableSnapshotInputFormat extends IntegrationTestBase {
+
+ private static final Log LOG = LogFactory.getLog(IntegrationTestTableSnapshotInputFormat.class);
+
+ private static final String TABLE_NAME_KEY = "IntegrationTestTableSnapshotInputFormat.table";
+ private static final String DEFAULT_TABLE_NAME = "IntegrationTestTableSnapshotInputFormat";
+
+ private static final String SNAPSHOT_NAME_KEY = "IntegrationTestTableSnapshotInputFormat.snapshot";
+
+
+ private static final String NUM_REGIONS_KEY = "IntegrationTestTableSnapshotInputFormat.numRegions";
+ private static final int DEFAULT_NUM_REGIONS = 32;
+
+ private static final String TABLE_DIR_KEY = "IntegrationTestTableSnapshotInputFormat.tableDir";
+
+ private IntegrationTestingUtility util;
+
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ util = getTestingUtil(conf);
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ util = getTestingUtil(getConf());
+ util.initializeCluster(1);
+ this.setConf(util.getConfiguration());
+ }
+
+ @Override
+ @After
+ public void cleanUp() throws Exception {
+ util.restoreCluster();
+ }
+
+ @Override
+ public int runTestFromCommandLine() throws Exception {
+ Configuration conf = getConf();
+ TableName tableName = TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
+ String snapshotName = conf.get(SNAPSHOT_NAME_KEY, tableName.getQualifierAsString()
+ + "_snapshot_" + System.currentTimeMillis());
+ int numRegions = conf.getInt(NUM_REGIONS_KEY, DEFAULT_NUM_REGIONS);
+ String tableDirStr = conf.get(TABLE_DIR_KEY);
+ Path tableDir;
+ if (tableDirStr == null) {
+ tableDir = util.getDataTestDirOnTestFS(tableName.getQualifierAsString());
+ } else {
+ tableDir = new Path(tableDirStr);
+ }
+
+ /* We create the table using HBaseAdmin#createTable(), which will create the table
+ * with desired number of regions. We pass bbb as startKey and yyy as endKey, so if
+ * desiredNumRegions is > 2, we create regions empty - bbb and yyy - empty, and we
+ * create numRegions - 2 regions between bbb - yyy. The test uses a Scan with startRow
+ * bbb and endRow yyy, so, we expect the first and last region to be filtered out in
+ * the input format, and we expect numRegions - 2 splits between bbb and yyy.
+ */
+ int expectedNumSplits = numRegions > 2 ? numRegions - 2 : numRegions;
+
+ TestTableSnapshotInputFormat.doTestWithMapReduce(util, tableName, snapshotName, tableDir,
+ numRegions, expectedNumSplits, false);
+
+ return 0;
+ }
+
+ @Override // CM is not intended to be run with this test
+ public String getTablename() {
+ return null;
+ }
+
+ @Override
+ protected Set getColumnFamilies() {
+ return null;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ IntegrationTestingUtility.setUseDistributedCluster(conf);
+ int ret = ToolRunner.run(conf, new IntegrationTestTableSnapshotInputFormat(), args);
+ System.exit(ret);
+ }
+
+}
diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MapReduceProtos.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MapReduceProtos.java
index fb617fb..1457fe8 100644
--- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MapReduceProtos.java
+++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MapReduceProtos.java
@@ -717,11 +717,764 @@ public final class MapReduceProtos {
// @@protoc_insertion_point(class_scope:ScanMetrics)
}
+ public interface TableSnapshotRegionSplitOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+
+ // optional .RegionSpecifier region = 1;
+ /**
+ * optional .RegionSpecifier region = 1;
+ */
+ boolean hasRegion();
+ /**
+ * optional .RegionSpecifier region = 1;
+ */
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier getRegion();
+ /**
+ * optional .RegionSpecifier region = 1;
+ */
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder getRegionOrBuilder();
+
+ // repeated string locations = 2;
+ /**
+ * repeated string locations = 2;
+ */
+ java.util.List
+ getLocationsList();
+ /**
+ * repeated string locations = 2;
+ */
+ int getLocationsCount();
+ /**
+ * repeated string locations = 2;
+ */
+ java.lang.String getLocations(int index);
+ /**
+ * repeated string locations = 2;
+ */
+ com.google.protobuf.ByteString
+ getLocationsBytes(int index);
+ }
+ /**
+ * Protobuf type {@code TableSnapshotRegionSplit}
+ */
+ public static final class TableSnapshotRegionSplit extends
+ com.google.protobuf.GeneratedMessage
+ implements TableSnapshotRegionSplitOrBuilder {
+ // Use TableSnapshotRegionSplit.newBuilder() to construct.
+ private TableSnapshotRegionSplit(com.google.protobuf.GeneratedMessage.Builder> builder) {
+ super(builder);
+ this.unknownFields = builder.getUnknownFields();
+ }
+ private TableSnapshotRegionSplit(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+ private static final TableSnapshotRegionSplit defaultInstance;
+ public static TableSnapshotRegionSplit getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public TableSnapshotRegionSplit getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private final com.google.protobuf.UnknownFieldSet unknownFields;
+ @java.lang.Override
+ public final com.google.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private TableSnapshotRegionSplit(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ initFields();
+ int mutable_bitField0_ = 0;
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ case 10: {
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder subBuilder = null;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ subBuilder = region_.toBuilder();
+ }
+ region_ = input.readMessage(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.PARSER, extensionRegistry);
+ if (subBuilder != null) {
+ subBuilder.mergeFrom(region_);
+ region_ = subBuilder.buildPartial();
+ }
+ bitField0_ |= 0x00000001;
+ break;
+ }
+ case 18: {
+ if (!((mutable_bitField0_ & 0x00000002) == 0x00000002)) {
+ locations_ = new com.google.protobuf.LazyStringArrayList();
+ mutable_bitField0_ |= 0x00000002;
+ }
+ locations_.add(input.readBytes());
+ break;
+ }
+ }
+ }
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new com.google.protobuf.InvalidProtocolBufferException(
+ e.getMessage()).setUnfinishedMessage(this);
+ } finally {
+ if (((mutable_bitField0_ & 0x00000002) == 0x00000002)) {
+ locations_ = new com.google.protobuf.UnmodifiableLazyStringList(locations_);
+ }
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.internal_static_TableSnapshotRegionSplit_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.internal_static_TableSnapshotRegionSplit_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit.class, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit.Builder.class);
+ }
+
+ public static com.google.protobuf.Parser PARSER =
+ new com.google.protobuf.AbstractParser() {
+ public TableSnapshotRegionSplit parsePartialFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return new TableSnapshotRegionSplit(input, extensionRegistry);
+ }
+ };
+
+ @java.lang.Override
+ public com.google.protobuf.Parser getParserForType() {
+ return PARSER;
+ }
+
+ private int bitField0_;
+ // optional .RegionSpecifier region = 1;
+ public static final int REGION_FIELD_NUMBER = 1;
+ private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier region_;
+ /**
+ * optional .RegionSpecifier region = 1;
+ */
+ public boolean hasRegion() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * optional .RegionSpecifier region = 1;
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier getRegion() {
+ return region_;
+ }
+ /**
+ * optional .RegionSpecifier region = 1;
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder getRegionOrBuilder() {
+ return region_;
+ }
+
+ // repeated string locations = 2;
+ public static final int LOCATIONS_FIELD_NUMBER = 2;
+ private com.google.protobuf.LazyStringList locations_;
+ /**
+ * repeated string locations = 2;
+ */
+ public java.util.List
+ getLocationsList() {
+ return locations_;
+ }
+ /**
+ * repeated string locations = 2;
+ */
+ public int getLocationsCount() {
+ return locations_.size();
+ }
+ /**
+ * repeated string locations = 2;
+ */
+ public java.lang.String getLocations(int index) {
+ return locations_.get(index);
+ }
+ /**
+ * repeated string locations = 2;
+ */
+ public com.google.protobuf.ByteString
+ getLocationsBytes(int index) {
+ return locations_.getByteString(index);
+ }
+
+ private void initFields() {
+ region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
+ locations_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (hasRegion()) {
+ if (!getRegion().isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeMessage(1, region_);
+ }
+ for (int i = 0; i < locations_.size(); i++) {
+ output.writeBytes(2, locations_.getByteString(i));
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeMessageSize(1, region_);
+ }
+ {
+ int dataSize = 0;
+ for (int i = 0; i < locations_.size(); i++) {
+ dataSize += com.google.protobuf.CodedOutputStream
+ .computeBytesSizeNoTag(locations_.getByteString(i));
+ }
+ size += dataSize;
+ size += 1 * getLocationsList().size();
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ @java.lang.Override
+ public boolean equals(final java.lang.Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit)) {
+ return super.equals(obj);
+ }
+ org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit other = (org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit) obj;
+
+ boolean result = true;
+ result = result && (hasRegion() == other.hasRegion());
+ if (hasRegion()) {
+ result = result && getRegion()
+ .equals(other.getRegion());
+ }
+ result = result && getLocationsList()
+ .equals(other.getLocationsList());
+ result = result &&
+ getUnknownFields().equals(other.getUnknownFields());
+ return result;
+ }
+
+ private int memoizedHashCode = 0;
+ @java.lang.Override
+ public int hashCode() {
+ if (memoizedHashCode != 0) {
+ return memoizedHashCode;
+ }
+ int hash = 41;
+ hash = (19 * hash) + getDescriptorForType().hashCode();
+ if (hasRegion()) {
+ hash = (37 * hash) + REGION_FIELD_NUMBER;
+ hash = (53 * hash) + getRegion().hashCode();
+ }
+ if (getLocationsCount() > 0) {
+ hash = (37 * hash) + LOCATIONS_FIELD_NUMBER;
+ hash = (53 * hash) + getLocationsList().hashCode();
+ }
+ hash = (29 * hash) + getUnknownFields().hashCode();
+ memoizedHashCode = hash;
+ return hash;
+ }
+
+ public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code TableSnapshotRegionSplit}
+ */
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder
+ implements org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplitOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.internal_static_TableSnapshotRegionSplit_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.internal_static_TableSnapshotRegionSplit_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit.class, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit.Builder.class);
+ }
+
+ // Construct using org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ getRegionFieldBuilder();
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ if (regionBuilder_ == null) {
+ region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
+ } else {
+ regionBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000001);
+ locations_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000002);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.internal_static_TableSnapshotRegionSplit_descriptor;
+ }
+
+ public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit getDefaultInstanceForType() {
+ return org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit.getDefaultInstance();
+ }
+
+ public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit build() {
+ org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit buildPartial() {
+ org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit result = new org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ if (regionBuilder_ == null) {
+ result.region_ = region_;
+ } else {
+ result.region_ = regionBuilder_.build();
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ locations_ = new com.google.protobuf.UnmodifiableLazyStringList(
+ locations_);
+ bitField0_ = (bitField0_ & ~0x00000002);
+ }
+ result.locations_ = locations_;
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit) {
+ return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit other) {
+ if (other == org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit.getDefaultInstance()) return this;
+ if (other.hasRegion()) {
+ mergeRegion(other.getRegion());
+ }
+ if (!other.locations_.isEmpty()) {
+ if (locations_.isEmpty()) {
+ locations_ = other.locations_;
+ bitField0_ = (bitField0_ & ~0x00000002);
+ } else {
+ ensureLocationsIsMutable();
+ locations_.addAll(other.locations_);
+ }
+ onChanged();
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (hasRegion()) {
+ if (!getRegion().isInitialized()) {
+
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit) e.getUnfinishedMessage();
+ throw e;
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+ private int bitField0_;
+
+ // optional .RegionSpecifier region = 1;
+ private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
+ private com.google.protobuf.SingleFieldBuilder<
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder> regionBuilder_;
+ /**
+ * optional .RegionSpecifier region = 1;
+ */
+ public boolean hasRegion() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * optional .RegionSpecifier region = 1;
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier getRegion() {
+ if (regionBuilder_ == null) {
+ return region_;
+ } else {
+ return regionBuilder_.getMessage();
+ }
+ }
+ /**
+ * optional .RegionSpecifier region = 1;
+ */
+ public Builder setRegion(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier value) {
+ if (regionBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ region_ = value;
+ onChanged();
+ } else {
+ regionBuilder_.setMessage(value);
+ }
+ bitField0_ |= 0x00000001;
+ return this;
+ }
+ /**
+ * optional .RegionSpecifier region = 1;
+ */
+ public Builder setRegion(
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder builderForValue) {
+ if (regionBuilder_ == null) {
+ region_ = builderForValue.build();
+ onChanged();
+ } else {
+ regionBuilder_.setMessage(builderForValue.build());
+ }
+ bitField0_ |= 0x00000001;
+ return this;
+ }
+ /**
+ * optional .RegionSpecifier region = 1;
+ */
+ public Builder mergeRegion(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier value) {
+ if (regionBuilder_ == null) {
+ if (((bitField0_ & 0x00000001) == 0x00000001) &&
+ region_ != org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance()) {
+ region_ =
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.newBuilder(region_).mergeFrom(value).buildPartial();
+ } else {
+ region_ = value;
+ }
+ onChanged();
+ } else {
+ regionBuilder_.mergeFrom(value);
+ }
+ bitField0_ |= 0x00000001;
+ return this;
+ }
+ /**
+ * optional .RegionSpecifier region = 1;
+ */
+ public Builder clearRegion() {
+ if (regionBuilder_ == null) {
+ region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
+ onChanged();
+ } else {
+ regionBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000001);
+ return this;
+ }
+ /**
+ * optional .RegionSpecifier region = 1;
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder getRegionBuilder() {
+ bitField0_ |= 0x00000001;
+ onChanged();
+ return getRegionFieldBuilder().getBuilder();
+ }
+ /**
+ * optional .RegionSpecifier region = 1;
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder getRegionOrBuilder() {
+ if (regionBuilder_ != null) {
+ return regionBuilder_.getMessageOrBuilder();
+ } else {
+ return region_;
+ }
+ }
+ /**
+ * optional .RegionSpecifier region = 1;
+ */
+ private com.google.protobuf.SingleFieldBuilder<
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder>
+ getRegionFieldBuilder() {
+ if (regionBuilder_ == null) {
+ regionBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifierOrBuilder>(
+ region_,
+ getParentForChildren(),
+ isClean());
+ region_ = null;
+ }
+ return regionBuilder_;
+ }
+
+ // repeated string locations = 2;
+ private com.google.protobuf.LazyStringList locations_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+ private void ensureLocationsIsMutable() {
+ if (!((bitField0_ & 0x00000002) == 0x00000002)) {
+ locations_ = new com.google.protobuf.LazyStringArrayList(locations_);
+ bitField0_ |= 0x00000002;
+ }
+ }
+ /**
+ * repeated string locations = 2;
+ */
+ public java.util.List
+ getLocationsList() {
+ return java.util.Collections.unmodifiableList(locations_);
+ }
+ /**
+ * repeated string locations = 2;
+ */
+ public int getLocationsCount() {
+ return locations_.size();
+ }
+ /**
+ * repeated string locations = 2;
+ */
+ public java.lang.String getLocations(int index) {
+ return locations_.get(index);
+ }
+ /**
+ * repeated string locations = 2;
+ */
+ public com.google.protobuf.ByteString
+ getLocationsBytes(int index) {
+ return locations_.getByteString(index);
+ }
+ /**
+ * repeated string locations = 2;
+ */
+ public Builder setLocations(
+ int index, java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureLocationsIsMutable();
+ locations_.set(index, value);
+ onChanged();
+ return this;
+ }
+ /**
+ * repeated string locations = 2;
+ */
+ public Builder addLocations(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureLocationsIsMutable();
+ locations_.add(value);
+ onChanged();
+ return this;
+ }
+ /**
+ * repeated string locations = 2;
+ */
+ public Builder addAllLocations(
+ java.lang.Iterable values) {
+ ensureLocationsIsMutable();
+ super.addAll(values, locations_);
+ onChanged();
+ return this;
+ }
+ /**
+ * repeated string locations = 2;
+ */
+ public Builder clearLocations() {
+ locations_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000002);
+ onChanged();
+ return this;
+ }
+ /**
+ * repeated string locations = 2;
+ */
+ public Builder addLocationsBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureLocationsIsMutable();
+ locations_.add(value);
+ onChanged();
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:TableSnapshotRegionSplit)
+ }
+
+ static {
+ defaultInstance = new TableSnapshotRegionSplit(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:TableSnapshotRegionSplit)
+ }
+
private static com.google.protobuf.Descriptors.Descriptor
internal_static_ScanMetrics_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_ScanMetrics_fieldAccessorTable;
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_TableSnapshotRegionSplit_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_TableSnapshotRegionSplit_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
@@ -732,9 +1485,11 @@ public final class MapReduceProtos {
static {
java.lang.String[] descriptorData = {
"\n\017MapReduce.proto\032\013HBase.proto\".\n\013ScanMe" +
- "trics\022\037\n\007metrics\030\001 \003(\0132\016.NameInt64PairBB" +
- "\n*org.apache.hadoop.hbase.protobuf.gener" +
- "atedB\017MapReduceProtosH\001\240\001\001"
+ "trics\022\037\n\007metrics\030\001 \003(\0132\016.NameInt64Pair\"O" +
+ "\n\030TableSnapshotRegionSplit\022 \n\006region\030\001 \001" +
+ "(\0132\020.RegionSpecifier\022\021\n\tlocations\030\002 \003(\tB" +
+ "B\n*org.apache.hadoop.hbase.protobuf.gene" +
+ "ratedB\017MapReduceProtosH\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -747,6 +1502,12 @@ public final class MapReduceProtos {
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ScanMetrics_descriptor,
new java.lang.String[] { "Metrics", });
+ internal_static_TableSnapshotRegionSplit_descriptor =
+ getDescriptor().getMessageTypes().get(1);
+ internal_static_TableSnapshotRegionSplit_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_TableSnapshotRegionSplit_descriptor,
+ new java.lang.String[] { "Region", "Locations", });
return null;
}
};
diff --git hbase-protocol/src/main/protobuf/MapReduce.proto hbase-protocol/src/main/protobuf/MapReduce.proto
index e9ef17a..0c4c29e 100644
--- hbase-protocol/src/main/protobuf/MapReduce.proto
+++ hbase-protocol/src/main/protobuf/MapReduce.proto
@@ -18,15 +18,18 @@
//This file includes protocol buffers used in MapReduce only.
- option java_package = "org.apache.hadoop.hbase.protobuf.generated";
- option java_outer_classname = "MapReduceProtos";
- option java_generate_equals_and_hash = true;
- option optimize_for = SPEED;
+option java_package = "org.apache.hadoop.hbase.protobuf.generated";
+option java_outer_classname = "MapReduceProtos";
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
- import "HBase.proto";
+import "HBase.proto";
- message ScanMetrics {
+message ScanMetrics {
+ repeated NameInt64Pair metrics = 1;
+}
- repeated NameInt64Pair metrics = 1;
-
- }
\ No newline at end of file
+message TableSnapshotRegionSplit {
+ optional RegionSpecifier region = 1;
+ repeated string locations = 2;
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/HDFSBlocksDistribution.java hbase-server/src/main/java/org/apache/hadoop/hbase/HDFSBlocksDistribution.java
index 11619d3..88d7d3f 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/HDFSBlocksDistribution.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/HDFSBlocksDistribution.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
public class HDFSBlocksDistribution {
private Map hostAndWeights = null;
private long uniqueBlocksTotalWeight = 0;
-
+
/**
* Stores the hostname and weight for that hostname.
*
@@ -58,7 +58,7 @@ public class HDFSBlocksDistribution {
* Constructor
* @param host the host name
* @param weight the weight
- */
+ */
public HostAndWeight(String host, long weight) {
this.host = host;
this.weight = weight;
@@ -67,28 +67,28 @@ public class HDFSBlocksDistribution {
/**
* add weight
* @param weight the weight
- */
+ */
public void addWeight(long weight) {
this.weight += weight;
}
/**
* @return the host name
- */
+ */
public String getHost() {
return host;
}
/**
* @return the weight
- */
+ */
public long getWeight() {
return weight;
}
/**
* comparator used to sort hosts based on weight
- */
+ */
public static class WeightComparator implements Comparator {
@Override
public int compare(HostAndWeight l, HostAndWeight r) {
@@ -99,7 +99,7 @@ public class HDFSBlocksDistribution {
}
}
}
-
+
/**
* Constructor
*/
@@ -137,12 +137,12 @@ public class HDFSBlocksDistribution {
/**
* add some weight to the total unique weight
* @param weight the weight
- */
+ */
private void addUniqueWeight(long weight) {
uniqueBlocksTotalWeight += weight;
}
-
-
+
+
/**
* add some weight to a specific host
* @param host the host name
@@ -186,14 +186,14 @@ public class HDFSBlocksDistribution {
}
return weight;
}
-
+
/**
* @return the sum of all unique blocks' weight
*/
public long getUniqueBlocksTotalWeight() {
return uniqueBlocksTotalWeight;
}
-
+
/**
* return the locality index of a given host
* @param host the host name
@@ -207,8 +207,8 @@ public class HDFSBlocksDistribution {
}
return localityIndex;
}
-
-
+
+
/**
* This will add the distribution from input to this object
* @param otherBlocksDistribution the other hdfs blocks distribution
@@ -223,19 +223,27 @@ public class HDFSBlocksDistribution {
}
addUniqueWeight(otherBlocksDistribution.getUniqueBlocksTotalWeight());
}
-
+
/**
* return the sorted list of hosts in terms of their weights
*/
public List getTopHosts() {
- NavigableSet orderedHosts = new TreeSet(
- new HostAndWeight.WeightComparator());
- orderedHosts.addAll(this.hostAndWeights.values());
- List topHosts = new ArrayList(orderedHosts.size());
- for(HostAndWeight haw : orderedHosts.descendingSet()) {
+ HostAndWeight[] hostAndWeights = getTopHostsWithWeights();
+ List topHosts = new ArrayList(hostAndWeights.length);
+ for(HostAndWeight haw : hostAndWeights) {
topHosts.add(haw.getHost());
}
return topHosts;
}
+ /**
+ * return the sorted list of hosts in terms of their weights
+ */
+ public HostAndWeight[] getTopHostsWithWeights() {
+ NavigableSet orderedHosts = new TreeSet(
+ new HostAndWeight.WeightComparator());
+ orderedHosts.addAll(this.hostAndWeights.values());
+ return orderedHosts.descendingSet().toArray(new HostAndWeight[orderedHosts.size()]);
+ }
+
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
new file mode 100644
index 0000000..588bd55
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.mortbay.log.Log;
+
+/**
+ * A client scanner for a region opened for read-only on the client side. Assumes region data
+ * is not changing.
+ */
+@InterfaceAudience.Private
+public class ClientSideRegionScanner extends AbstractClientScanner {
+
+ private HRegion region;
+ private Scan scan;
+ RegionScanner scanner;
+ List values;
+ Result result = null;
+
+ public ClientSideRegionScanner(Configuration conf, FileSystem fs,
+ Path rootDir, HTableDescriptor htd, HRegionInfo hri, Scan scan, ScanMetrics scanMetrics) throws IOException {
+
+ this.scan = scan;
+
+ // region is immutable, set isolation level
+ scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
+
+ // open region from the snapshot directory
+ this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null);
+
+ // create an internal region scanner
+ this.scanner = region.getScanner(scan);
+ values = new ArrayList();
+
+ if (scanMetrics == null) {
+ initScanMetrics(scan);
+ } else {
+ this.scanMetrics = scanMetrics;
+ }
+ region.startRegionOperation();
+ }
+
+ @Override
+ public Result next() throws IOException {
+ values.clear();
+
+ scanner.nextRaw(values, -1); // pass -1 as limit so that we see the whole row.
+ if (values == null || values.isEmpty()) {
+ //we are done
+ return null;
+ }
+
+ this.result = Result.create(values);
+ if (this.scanMetrics != null) {
+ long resultSize = 0;
+ for (Cell kv : values) {
+ // TODO add getLength to Cell/use CellUtil#estimatedSizeOf
+ resultSize += KeyValueUtil.ensureKeyValue(kv).getLength();
+ }
+ this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
+ }
+
+ return result;
+ }
+
+ @Override
+ public void close() {
+ if (this.scanner != null) {
+ try {
+ this.scanner.close();
+ this.scanner = null;
+ } catch (IOException ex) {
+ Log.warn("Exception while closing scanner", ex);
+ }
+ }
+ if (this.region != null) {
+ this.region.closeRegionOperation();
+ try {
+ this.region.close(true);
+ this.region = null;
+ } catch (IOException ex) {
+ Log.warn("Exception while closing region", ex);
+ }
+ }
+ }
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java
new file mode 100644
index 0000000..4a2f355
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
+import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+
+/**
+ * A Scanner which performs a scan over snapshot files. Using this class requires to restore the
+ * snapshot to a temporary empty directory, which will copy the snapshot reference files into that
+ * directory. Actual data files are not copied.
+ *
+ *
+ * This also allows to run the scan from an
+ * online or offline hbase cluster. The snapshot files can be exported by using the
+ * {@link ExportSnapshot} tool, to a pure-hdfs cluster, and this scanner can be used to
+ * run the scan directly over the snapshot files.
+ *
+ *
+ * An internal RegionScanner is used to execute the {@link Scan} obtained
+ * from the user for each region in the snapshot.
+ *
+ * HBase owns all the data and snapshot files on the filesystem. Only the HBase user can read from
+ * snapshot files and data files. HBase also enforces security because all the requests are handled
+ * by the server layer, and the user cannot read from the data files directly. To read from snapshot
+ * files directly from the file system, the user who is running the MR job must have sufficient
+ * permissions to access snapshot and reference files. This means that to run mapreduce over
+ * snapshot files, the job has to be run as the HBase user or the user must have group or other
+ * priviledges in the filesystem (See HBASE-8369). Note that, given other users access to read from
+ * snapshot/data files will completely circumvent the access control enforced by HBase.
+ * @see TableSnapshotInputFormat
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class TableSnapshotScanner extends AbstractClientScanner {
+
+ private static final Log LOG = LogFactory.getLog(TableSnapshotScanner.class);
+
+ private Configuration conf;
+ private String snapshotName;
+ private FileSystem fs;
+ private Path rootDir;
+ private Path restoreDir;
+ private Scan scan;
+ private ArrayList regions;
+ private HTableDescriptor htd;
+
+ private ClientSideRegionScanner currentRegionScanner = null;
+ private int currentRegion = -1;
+
+ public TableSnapshotScanner(Configuration conf, Path restoreDir,
+ String snapshotName, Scan scan) throws IOException {
+ this(conf, new Path(conf.get(HConstants.HBASE_DIR)),
+ restoreDir, snapshotName, scan);
+ }
+
+ public TableSnapshotScanner(Configuration conf, Path rootDir,
+ Path restoreDir, String snapshotName, Scan scan) throws IOException {
+ this.conf = conf;
+ this.snapshotName = snapshotName;
+ this.rootDir = rootDir;
+ this.restoreDir = restoreDir;
+ this.scan = scan;
+ this.fs = rootDir.getFileSystem(conf);
+ init();
+ }
+
+ private void init() throws IOException {
+ Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
+
+ //load table descriptor
+ htd = FSTableDescriptors.getTableDescriptorFromFs(fs, snapshotDir);
+
+ Set snapshotRegionNames
+ = SnapshotReferenceUtil.getSnapshotRegionNames(fs, snapshotDir);
+ if (snapshotRegionNames == null) {
+ throw new IllegalArgumentException("Snapshot seems empty");
+ }
+
+ regions = new ArrayList(snapshotRegionNames.size());
+ for (String regionName : snapshotRegionNames) {
+ // load region descriptor
+ Path regionDir = new Path(snapshotDir, regionName);
+ HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs,
+ regionDir);
+
+ if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
+ hri.getStartKey(), hri.getEndKey())) {
+ regions.add(hri);
+ }
+ }
+
+ // sort for regions according to startKey.
+ Collections.sort(regions);
+
+ initScanMetrics(scan);
+
+ RestoreSnapshotHelper.restoreSnapshotForScanner(conf, fs,
+ rootDir, restoreDir, snapshotName);
+ }
+
+ @Override
+ public Result next() throws IOException {
+ Result result = null;
+ while (true) {
+ if (currentRegionScanner == null) {
+ currentRegion++;
+ if (currentRegion >= regions.size()) {
+ return null;
+ }
+
+ HRegionInfo hri = regions.get(currentRegion);
+ currentRegionScanner = new ClientSideRegionScanner(conf, fs,
+ restoreDir, htd, hri, scan, scanMetrics);
+ if (this.scanMetrics != null) {
+ this.scanMetrics.countOfRegions.incrementAndGet();
+ }
+ }
+
+ result = currentRegionScanner.next();
+ if (result != null) {
+ return result;
+ } else {
+ currentRegionScanner.close();
+ currentRegionScanner = null;
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ if (currentRegionScanner != null) {
+ currentRegionScanner.close();
+ }
+ }
+
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
index f4b9517..e5ed87f 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;
+import org.cliffc.high_scale_lib.Counter;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -114,6 +115,32 @@ public class TableMapReduceUtil {
job, true);
}
+ /**
+ * Use this before submitting a TableMap job. It will appropriately set up
+ * the job.
+ *
+ * @param table The table name to read from.
+ * @param scan The scan instance with the columns, time range etc.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
+ * @param addDependencyJars upload HBase jars and jars for any of the configured
+ * job classes via the distributed cache (tmpjars).
+ * @throws IOException When setting up the details fails.
+ */
+ public static void initTableMapperJob(String table, Scan scan,
+ Class extends TableMapper> mapper,
+ Class> outputKeyClass,
+ Class> outputValueClass, Job job,
+ boolean addDependencyJars, Class extends InputFormat> inputFormatClass)
+ throws IOException {
+ initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job,
+ addDependencyJars, true, inputFormatClass);
+ }
+
+
/**
* Use this before submitting a TableMap job. It will appropriately set up
* the job.
@@ -127,13 +154,16 @@ public class TableMapReduceUtil {
* carrying all necessary HBase configuration.
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
+ * @param initCredentials whether to initialize hbase auth credentials for the job
+ * @param inputFormatClass the input format
* @throws IOException When setting up the details fails.
*/
public static void initTableMapperJob(String table, Scan scan,
Class extends TableMapper> mapper,
Class> outputKeyClass,
Class> outputValueClass, Job job,
- boolean addDependencyJars, Class extends InputFormat> inputFormatClass)
+ boolean addDependencyJars, boolean initCredentials,
+ Class extends InputFormat> inputFormatClass)
throws IOException {
job.setInputFormatClass(inputFormatClass);
if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
@@ -152,7 +182,9 @@ public class TableMapReduceUtil {
if (addDependencyJars) {
addDependencyJars(job);
}
- initCredentials(job);
+ if (initCredentials) {
+ initCredentials(job);
+ }
}
/**
@@ -232,6 +264,39 @@ public class TableMapReduceUtil {
}
/**
+ * Sets up the job for reading from a table snapshot. It bypasses hbase servers
+ * and read directly from snapshot files.
+ *
+ * @param snapshotName The name of the snapshot (of a table) to read from.
+ * @param scan The scan instance with the columns, time range etc.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
+ * @param addDependencyJars upload HBase jars and jars for any of the configured
+ * job classes via the distributed cache (tmpjars).
+ *
+ * @param tmpTableDir temporary directory in the filesystem to restore the
+ * table snapshot into.
+ * @throws IOException When setting up the details fails.
+ * @see TableSnapshotInputFormat
+ */
+ public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
+ Class extends TableMapper> mapper,
+ Class> outputKeyClass,
+ Class> outputValueClass, Job job,
+ boolean addDependencyJars, Path tmpTableDir)
+ throws IOException {
+ TableSnapshotInputFormat.setInput(job, snapshotName, tmpTableDir);
+ initTableMapperJob(snapshotName, scan, mapper, outputKeyClass,
+ outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class);
+
+ // We would need even more libraries that hbase-server depends on
+ TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Counter.class);
+ }
+
+ /**
* Use this before submitting a Multi TableMap job. It will appropriately set
* up the job.
*
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
index a21f4e0..51ca992 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
@@ -35,11 +35,9 @@ import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong;
import org.apache.hadoop.util.StringUtils;
/**
@@ -97,7 +95,7 @@ public class TableRecordReaderImpl {
* @return The getCounter method or null if not available.
* @throws IOException
*/
- private Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
+ protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
throws IOException {
Method m = null;
try {
@@ -253,11 +251,6 @@ public class TableRecordReaderImpl {
* @throws IOException
*/
private void updateCounters() throws IOException {
- // we can get access to counters only if hbase uses new mapreduce APIs
- if (this.getCounter == null) {
- return;
- }
-
byte[] serializedMetrics = currentScan.getAttribute(
Scan.SCAN_ATTRIBUTES_METRICS_DATA);
if (serializedMetrics == null || serializedMetrics.length == 0 ) {
@@ -266,16 +259,25 @@ public class TableRecordReaderImpl {
ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(serializedMetrics);
+ updateCounters(scanMetrics, numRestarts, getCounter, context);
+ }
+
+ protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
+ Method getCounter, TaskAttemptContext context) {
+ // we can get access to counters only if hbase uses new mapreduce APIs
+ if (getCounter == null) {
+ return;
+ }
+
try {
for (Map.Entry entry:scanMetrics.getMetricsMap().entrySet()) {
- Counter ct = (Counter)this.getCounter.invoke(context,
+ Counter ct = (Counter)getCounter.invoke(context,
HBASE_COUNTER_GROUP_NAME, entry.getKey());
ct.increment(entry.getValue());
}
-
- ((Counter) this.getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
- "NUM_SCANNER_RESTARTS")).increment(numRestarts);
+ ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
+ "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
} catch (Exception e) {
LOG.debug("can't update counter." + StringUtils.stringifyException(e));
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
new file mode 100644
index 0000000..e56279e
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
+import org.apache.hadoop.hbase.client.IsolationLevel;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableSnapshotScanner;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
+import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+
+/**
+ * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job
+ * bypasses HBase servers, and directly accesses the underlying files (hfile, recovered edits,
+ * hlogs, etc) directly to provide maximum performance. The snapshot is not required to be
+ * restored to the live cluster or cloned. This also allows to run the mapreduce job from an
+ * online or offline hbase cluster. The snapshot files can be exported by using the
+ * {@link ExportSnapshot} tool, to a pure-hdfs cluster, and this InputFormat can be used to
+ * run the mapreduce job directly over the snapshot files.
+ *
+ * Usage is similar to TableInputFormat, and
+ * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job, boolean)}
+ * can be used to configure the job.
+ *
+ * Internally, this input format restores the snapshot into the given tmp directory. Similar to
+ * {@link TableInputFormat} an InputSplit is created per region. The region is opened for reading
+ * from each RecordReader. An internal RegionScanner is used to execute the {@link Scan} obtained
+ * from the user.
+ *
+ * HBase owns all the data and snapshot files on the filesystem. Only the HBase user can read from
+ * snapshot files and data files. HBase also enforces security because all the requests are handled
+ * by the server layer, and the user cannot read from the data files directly. To read from snapshot
+ * files directly from the file system, the user who is running the MR job must have sufficient
+ * permissions to access snapshot and reference files. This means that to run mapreduce over
+ * snapshot files, the MR job has to be run as the HBase user or the user must have group or other
+ * priviledges in the filesystem (See HBASE-8369). Note that, given other users access to read from
+ * snapshot/data files will completely circumvent the access control enforced by HBase.
+ * @see TableSnapshotScanner
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class TableSnapshotInputFormat extends InputFormat {
+ // TODO: Snapshots files are owned in fs by the hbase user. There is no
+ // easy way to delegate access.
+
+ private static final Log LOG = LogFactory.getLog(TableSnapshotInputFormat.class);
+
+ /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
+ private static final String LOCALITY_CUTOFF_MULTIPLIER = "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
+ private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
+
+ private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
+ private static final String TABLE_DIR_KEY = "hbase.TableSnapshotInputFormat.table.dir";
+
+ public static class TableSnapshotRegionSplit extends InputSplit implements Writable {
+ private String regionName;
+ private String[] locations;
+
+ // constructor for mapreduce framework / Writable
+ public TableSnapshotRegionSplit() { }
+
+ TableSnapshotRegionSplit(String regionName, List locations) {
+ this.regionName = regionName;
+ if (locations == null || locations.isEmpty()) {
+ this.locations = new String[0];
+ } else {
+ this.locations = locations.toArray(new String[locations.size()]);
+ }
+ }
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ //TODO: We can obtain the file sizes of the snapshot here.
+ return 0;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return locations;
+ }
+
+ // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of
+ // doing this wrapping with Writables.
+ @Override
+ public void write(DataOutput out) throws IOException {
+ MapReduceProtos.TableSnapshotRegionSplit.Builder builder =
+ MapReduceProtos.TableSnapshotRegionSplit.newBuilder()
+ .setRegion(RegionSpecifier.newBuilder()
+ .setType(RegionSpecifierType.ENCODED_REGION_NAME)
+ .setValue(ByteString.copyFrom(Bytes.toBytes(regionName))).build());
+
+ for (String location : locations) {
+ builder.addLocations(location);
+ }
+
+ MapReduceProtos.TableSnapshotRegionSplit split = builder.build();
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ split.writeTo(baos);
+ baos.close();
+ byte[] buf = baos.toByteArray();
+ out.writeInt(buf.length);
+ out.write(buf);
+ }
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int len = in.readInt();
+ byte[] buf = new byte[len];
+ in.readFully(buf);
+ MapReduceProtos.TableSnapshotRegionSplit split = MapReduceProtos.TableSnapshotRegionSplit.PARSER.parseFrom(buf);
+ this.regionName = Bytes.toString(split.getRegion().getValue().toByteArray());
+ List locationsList = split.getLocationsList();
+ this.locations = locationsList.toArray(new String[locationsList.size()]);
+ }
+ }
+
+ @VisibleForTesting
+ class TableSnapshotRegionRecordReader extends RecordReader {
+ private TableSnapshotRegionSplit split;
+ private Scan scan;
+ private Result result = null;
+ private ImmutableBytesWritable row = null;
+ private ClientSideRegionScanner scanner;
+ private TaskAttemptContext context;
+ private Method getCounter;
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
+ InterruptedException {
+
+ Configuration conf = context.getConfiguration();
+ this.split = (TableSnapshotRegionSplit) split;
+ String regionName = this.split.regionName;
+ String snapshotName = getSnapshotName(conf);
+ Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
+ FileSystem fs = rootDir.getFileSystem(conf);
+
+ Path tmpRootDir = new Path(conf.get(TABLE_DIR_KEY)); // This is the user specified root
+ // directory where snapshot was restored
+
+ Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
+
+ //load table descriptor
+ HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, snapshotDir);
+
+ //load region descriptor
+ Path regionDir = new Path(snapshotDir, regionName);
+ HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
+
+ // create scan
+ String scanStr = conf.get(TableInputFormat.SCAN);
+ if (scanStr == null) {
+ throw new IllegalArgumentException("A Scan is not configured for this job");
+ }
+ scan = TableMapReduceUtil.convertStringToScan(scanStr);
+ scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); // region is immutable, this should be fine,
+ // otherwise we have to set the thread read point
+
+ scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null);
+ if (context != null) {
+ this.context = context;
+ getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context);
+ }
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ result = scanner.next();
+ if (result == null) {
+ //we are done
+ return false;
+ }
+
+ if (this.row == null) {
+ this.row = new ImmutableBytesWritable();
+ }
+ this.row.set(result.getRow());
+
+ ScanMetrics scanMetrics = scanner.getScanMetrics();
+ if (scanMetrics != null && context != null) {
+ TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context);
+ }
+
+ return true;
+ }
+
+ @Override
+ public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
+ return row;
+ }
+
+ @Override
+ public Result getCurrentValue() throws IOException, InterruptedException {
+ return result;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return 0; // TODO: use total bytes to estimate
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.scanner != null) {
+ this.scanner.close();
+ }
+ }
+ }
+
+ @Override
+ public RecordReader createRecordReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new TableSnapshotRegionRecordReader();
+ }
+
+ @Override
+ public List getSplits(JobContext job) throws IOException, InterruptedException {
+ Configuration conf = job.getConfiguration();
+ String snapshotName = getSnapshotName(conf);
+
+ Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
+ FileSystem fs = rootDir.getFileSystem(conf);
+
+ Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
+ SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
+
+ Set snapshotRegionNames
+ = SnapshotReferenceUtil.getSnapshotRegionNames(fs, snapshotDir);
+ if (snapshotRegionNames == null) {
+ throw new IllegalArgumentException("Snapshot seems empty");
+ }
+
+ // load table descriptor
+ HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs,
+ snapshotDir);
+
+ Scan scan = TableMapReduceUtil.convertStringToScan(conf
+ .get(TableInputFormat.SCAN));
+ Path tableDir = new Path(conf.get(TABLE_DIR_KEY));
+
+ List splits = new ArrayList();
+ for (String regionName : snapshotRegionNames) {
+ // load region descriptor
+ Path regionDir = new Path(snapshotDir, regionName);
+ HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs,
+ regionDir);
+
+ if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
+ hri.getStartKey(), hri.getEndKey())) {
+ // compute HDFS locations from snapshot files (which will get the locations for
+ // referred hfiles)
+ List hosts = getBestLocations(conf,
+ HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
+
+ int len = Math.min(3, hosts.size());
+ hosts = hosts.subList(0, len);
+ splits.add(new TableSnapshotRegionSplit(regionName, hosts));
+ }
+ }
+
+ return splits;
+ }
+
+ /**
+ * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take
+ * weights into account, thus will treat every location passed from the input split as equal. We
+ * do not want to blindly pass all the locations, since we are creating one split per region, and
+ * the region's blocks are all distributed throughout the cluster unless favorite node assignment
+ * is used. On the expected stable case, only one location will contain most of the blocks as local.
+ * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here
+ * we are doing a simple heuristic, where we will pass all hosts which have at least 80%
+ * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top
+ * host with the best locality.
+ */
+ @VisibleForTesting
+ List getBestLocations(Configuration conf, HDFSBlocksDistribution blockDistribution) {
+ List locations = new ArrayList(3);
+
+ HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
+
+ if (hostAndWeights.length == 0) {
+ return locations;
+ }
+
+ HostAndWeight topHost = hostAndWeights[0];
+ locations.add(topHost.getHost());
+
+ // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
+ double cutoffMultiplier
+ = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
+
+ double filterWeight = topHost.getWeight() * cutoffMultiplier;
+
+ for (int i = 1; i < hostAndWeights.length; i++) {
+ if (hostAndWeights[i].getWeight() >= filterWeight) {
+ locations.add(hostAndWeights[i].getHost());
+ } else {
+ break;
+ }
+ }
+
+ return locations;
+ }
+
+ public static void setInput(Job job, String snapshotName, Path restoreDir) throws IOException {
+ Configuration conf = job.getConfiguration();
+ conf.set(SNAPSHOT_NAME_KEY, snapshotName);
+
+ Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
+ FileSystem fs = rootDir.getFileSystem(conf);
+
+ // TODO: restore from record readers to parallelize.
+ RestoreSnapshotHelper.restoreSnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
+
+ conf.set(TABLE_DIR_KEY, restoreDir.toString());
+ }
+
+ private static String getSnapshotName(Configuration conf) {
+ String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
+ if (snapshotName == null) {
+ throw new IllegalArgumentException("Snapshot name must be provided");
+ }
+ return snapshotName;
+ }
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 88a15e9..0f7967a 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -748,8 +748,23 @@ public class HRegion implements HeapSize { // , Writable{
*/
public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
- HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
+ return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
+ }
+
+ /**
+ * This is a helper function to compute HDFS block distribution on demand
+ * @param conf configuration
+ * @param tableDescriptor HTableDescriptor of the table
+ * @param regionInfo encoded name of the region
+ * @param tablePath the table directory
+ * @return The HDFS blocks distribution for the given region.
+ * @throws IOException
+ */
+ public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
+ final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo, Path tablePath)
+ throws IOException {
+ HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
FileSystem fs = tablePath.getFileSystem(conf);
HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
@@ -1614,7 +1629,7 @@ public class HRegion implements HeapSize { // , Writable{
// Record latest flush time
this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
-
+
// Update the last flushed sequence id for region
if (this.rsServices != null) {
completeSequenceId = flushSeqId;
@@ -2077,7 +2092,7 @@ public class HRegion implements HeapSize { // , Writable{
lastIndexExclusive++;
continue;
}
-
+
// If we haven't got any rows in our batch, we should block to
// get the next one.
boolean shouldBlock = numReadyToWrite == 0;
@@ -2158,8 +2173,8 @@ public class HRegion implements HeapSize { // , Writable{
// calling the pre CP hook for batch mutation
if (!isInReplay && coprocessorHost != null) {
- MiniBatchOperationInProgress miniBatchOp =
- new MiniBatchOperationInProgress(batchOp.operations,
+ MiniBatchOperationInProgress miniBatchOp =
+ new MiniBatchOperationInProgress(batchOp.operations,
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
}
@@ -2231,7 +2246,7 @@ public class HRegion implements HeapSize { // , Writable{
locked = false;
}
releaseRowLocks(acquiredRowLocks);
-
+
// -------------------------
// STEP 7. Sync wal.
// -------------------------
@@ -2241,8 +2256,8 @@ public class HRegion implements HeapSize { // , Writable{
walSyncSuccessful = true;
// calling the post CP hook for batch mutation
if (!isInReplay && coprocessorHost != null) {
- MiniBatchOperationInProgress miniBatchOp =
- new MiniBatchOperationInProgress(batchOp.operations,
+ MiniBatchOperationInProgress miniBatchOp =
+ new MiniBatchOperationInProgress(batchOp.operations,
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
coprocessorHost.postBatchMutate(miniBatchOp);
}
@@ -3172,7 +3187,7 @@ public class HRegion implements HeapSize { // , Writable{
}
}
}
-
+
// allocate new lock for this thread
return rowLockContext.newLock();
} finally {
@@ -3884,11 +3899,36 @@ public class HRegion implements HeapSize { // , Writable{
final HLog hlog,
final boolean initialize, final boolean ignoreHLog)
throws IOException {
+ Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
+ return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, hlog, initialize, ignoreHLog);
+ }
+
+ /**
+ * Convenience method creating new HRegions. Used by createTable.
+ * The {@link HLog} for the created region needs to be closed
+ * explicitly, if it is not null.
+ * Use {@link HRegion#getLog()} to get access.
+ *
+ * @param info Info for region to create.
+ * @param rootDir Root directory for HBase instance
+ * @param tableDir table directory
+ * @param conf
+ * @param hTableDescriptor
+ * @param hlog shared HLog
+ * @param initialize - true to initialize the region
+ * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
+ * @return new HRegion
+ * @throws IOException
+ */
+ public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir,
+ final Configuration conf,
+ final HTableDescriptor hTableDescriptor,
+ final HLog hlog,
+ final boolean initialize, final boolean ignoreHLog)
+ throws IOException {
LOG.info("creating HRegion " + info.getTable().getNameAsString()
+ " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
" Table name == " + info.getTable().getNameAsString());
-
- Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
FileSystem fs = FileSystem.get(conf);
HRegionFileSystem rfs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
HLog effectiveHLog = hlog;
@@ -4044,15 +4084,39 @@ public class HRegion implements HeapSize { // , Writable{
final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
final RegionServerServices rsServices, final CancelableProgressable reporter)
throws IOException {
+ Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
+ return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
+ }
+
+ /**
+ * Open a Region.
+ * @param conf The Configuration object to use.
+ * @param fs Filesystem to use
+ * @param rootDir Root directory for HBase instance
+ * @param info Info for region to be opened.
+ * @param htd the table descriptor
+ * @param wal HLog for region to use. This method will call
+ * HLog#setSequenceNumber(long) passing the result of the call to
+ * HRegion#getMinSequenceId() to ensure the log id is properly kept
+ * up. HRegionStore does this every time it opens a new region.
+ * @param rsServices An interface we can request flushes against.
+ * @param reporter An interface we can report progress against.
+ * @return new HRegion
+ * @throws IOException
+ */
+ public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
+ final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
+ final RegionServerServices rsServices, final CancelableProgressable reporter)
+ throws IOException {
if (info == null) throw new NullPointerException("Passed region info is null");
if (LOG.isDebugEnabled()) {
LOG.debug("Opening region: " + info);
}
- Path dir = FSUtils.getTableDir(rootDir, info.getTable());
- HRegion r = HRegion.newHRegion(dir, wal, fs, conf, info, htd, rsServices);
+ HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
return r.openHRegion(reporter);
}
+
/**
* Useful when reopening a closed region (normally for unit tests)
* @param other original object
@@ -4641,7 +4705,7 @@ public class HRegion implements HeapSize { // , Writable{
Store store = stores.get(family.getKey());
List kvs = new ArrayList(family.getValue().size());
-
+
Collections.sort(family.getValue(), store.getComparator());
// Get previous values for all columns in this family
Get get = new Get(row);
@@ -4650,10 +4714,10 @@ public class HRegion implements HeapSize { // , Writable{
get.addColumn(family.getKey(), kv.getQualifier());
}
List results = get(get, false);
-
+
// Iterate the input columns and update existing values if they were
// found, otherwise add new column initialized to the append value
-
+
// Avoid as much copying as possible. Every byte is copied at most
// once.
// Would be nice if KeyValue had scatter/gather logic
@@ -4696,10 +4760,10 @@ public class HRegion implements HeapSize { // , Writable{
System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(),
newKV.getBuffer(), newKV.getQualifierOffset(),
kv.getQualifierLength());
-
+
newKV.setMvccVersion(w.getWriteNumber());
kvs.add(newKV);
-
+
// Append update to WAL
if (writeToWAL) {
if (walEdits == null) {
@@ -4708,11 +4772,11 @@ public class HRegion implements HeapSize { // , Writable{
walEdits.add(newKV);
}
}
-
+
//store the kvs to the temporary memstore before writing HLog
tempMemstore.put(store, kvs);
}
-
+
// Actually write to WAL now
if (writeToWAL) {
// Using default cluster id, as this can only happen in the orginating
@@ -4724,7 +4788,7 @@ public class HRegion implements HeapSize { // , Writable{
} else {
recordMutationWithoutWal(append.getFamilyCellMap());
}
-
+
//Actually write to Memstore now
for (Map.Entry> entry : tempMemstore.entrySet()) {
Store store = entry.getKey();
@@ -4816,7 +4880,7 @@ public class HRegion implements HeapSize { // , Writable{
Store store = stores.get(family.getKey());
List kvs = new ArrayList(family.getValue().size());
-
+
// Get previous values for all columns in this family
Get get = new Get(row);
for (Cell cell: family.getValue()) {
@@ -4825,7 +4889,7 @@ public class HRegion implements HeapSize { // , Writable{
}
get.setTimeRange(tr.getMin(), tr.getMax());
List results = get(get, false);
-
+
// Iterate the input columns and update existing values if they were
// found, otherwise add new column initialized to the increment amount
int idx = 0;
@@ -4842,13 +4906,13 @@ public class HRegion implements HeapSize { // , Writable{
}
idx++;
}
-
+
// Append new incremented KeyValue to list
KeyValue newKV =
new KeyValue(row, family.getKey(), CellUtil.cloneQualifier(kv), now, Bytes.toBytes(amount));
newKV.setMvccVersion(w.getWriteNumber());
kvs.add(newKV);
-
+
// Prepare WAL updates
if (writeToWAL) {
if (walEdits == null) {
@@ -4857,11 +4921,11 @@ public class HRegion implements HeapSize { // , Writable{
walEdits.add(newKV);
}
}
-
+
//store the kvs to the temporary memstore before writing HLog
tempMemstore.put(store, kvs);
}
-
+
// Actually write to WAL now
if (writeToWAL) {
// Using default cluster id, as this can only happen in the orginating
@@ -5546,7 +5610,7 @@ public class HRegion implements HeapSize { // , Writable{
*/
void failedBulkLoad(byte[] family, String srcPath) throws IOException;
}
-
+
@VisibleForTesting class RowLockContext {
private final HashedBytes row;
private final CountDownLatch latch = new CountDownLatch(1);
@@ -5557,16 +5621,16 @@ public class HRegion implements HeapSize { // , Writable{
this.row = row;
this.thread = Thread.currentThread();
}
-
+
boolean ownedByCurrentThread() {
return thread == Thread.currentThread();
}
-
+
RowLock newLock() {
lockCount++;
return new RowLock(this);
}
-
+
void releaseLock() {
if (!ownedByCurrentThread()) {
throw new IllegalArgumentException("Lock held by thread: " + thread
@@ -5584,7 +5648,7 @@ public class HRegion implements HeapSize { // , Writable{
}
}
}
-
+
/**
* Row lock held by a given thread.
* One thread may acquire multiple locks on the same row simultaneously.
@@ -5593,11 +5657,11 @@ public class HRegion implements HeapSize { // , Writable{
public class RowLock {
@VisibleForTesting final RowLockContext context;
private boolean released = false;
-
+
@VisibleForTesting RowLock(RowLockContext context) {
this.context = context;
}
-
+
/**
* Release the given lock. If there are no remaining locks held by the current thread
* then unlock the row and allow other threads to acquire the lock.
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
index 34c277a..09f9d48 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
@@ -18,8 +18,8 @@
package org.apache.hadoop.hbase.snapshot;
-import java.io.InputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashMap;
@@ -37,23 +37,24 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
-import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.FSVisitor;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
@@ -471,8 +472,9 @@ public class RestoreSnapshotHelper {
}
// create the regions on disk
- ModifyRegionUtils.createRegions(conf, rootDir,
+ ModifyRegionUtils.createRegions(conf, rootDir, tableDir,
tableDesc, clonedRegionsInfo, new ModifyRegionUtils.RegionFillTask() {
+ @Override
public void fillRegion(final HRegion region) throws IOException {
cloneRegion(region, snapshotRegions.get(region.getRegionInfo().getEncodedName()));
}
@@ -499,6 +501,7 @@ public class RestoreSnapshotHelper {
final String tableName = tableDesc.getTableName().getNameAsString();
SnapshotReferenceUtil.visitRegionStoreFiles(fs, snapshotRegionDir,
new FSVisitor.StoreFileVisitor() {
+ @Override
public void storeFile (final String region, final String family, final String hfile)
throws IOException {
LOG.info("Adding HFileLink " + hfile + " to table=" + tableName);
@@ -627,10 +630,13 @@ public class RestoreSnapshotHelper {
private void restoreWALs() throws IOException {
final SnapshotLogSplitter logSplitter = new SnapshotLogSplitter(conf, fs, tableDir,
snapshotTable, regionsMap);
+ // TODO: use executors to parallelize splitting
+ // TODO: once split, we do not need to split again for other restores
try {
// Recover.Edits
SnapshotReferenceUtil.visitRecoveredEdits(fs, snapshotDir,
new FSVisitor.RecoveredEditsVisitor() {
+ @Override
public void recoveredEdits (final String region, final String logfile) throws IOException {
Path path = SnapshotReferenceUtil.getRecoveredEdits(snapshotDir, region, logfile);
logSplitter.splitRecoveredEdit(path);
@@ -639,6 +645,7 @@ public class RestoreSnapshotHelper {
// Region Server Logs
SnapshotReferenceUtil.visitLogFiles(fs, snapshotDir, new FSVisitor.LogFileVisitor() {
+ @Override
public void logFile (final String server, final String logfile) throws IOException {
logSplitter.splitLog(server, logfile);
}
@@ -689,4 +696,45 @@ public class RestoreSnapshotHelper {
}
return htd;
}
+
+ /**
+ * Restore the snapshot for a snapshot scanner, discards meta changes.
+ * @param conf
+ * @param fs
+ * @param rootDir
+ * @param restoreDir
+ * @param snapshotName
+ * @throws IOException
+ */
+ public static void restoreSnapshotForScanner(Configuration conf, FileSystem fs, Path rootDir,
+ Path restoreDir, String snapshotName) throws IOException {
+ // ensure that restore dir is not under root dir
+ if (!restoreDir.getFileSystem(conf).getUri().equals(rootDir.getFileSystem(conf).getUri())) {
+ throw new IllegalArgumentException("Filesystems for restore directory and HBase root directory " +
+ "should be the same");
+ }
+ if (restoreDir.toUri().getPath().startsWith(rootDir.toUri().getPath())) {
+ throw new IllegalArgumentException("Restore directory cannot be a sub directory of HBase " +
+ "root directory. RootDir: " + rootDir + ", restoreDir: " + restoreDir);
+ }
+
+ Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
+ SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
+
+ //load table descriptor
+ HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, snapshotDir);
+
+ MonitoredTask status = TaskMonitor.get().createStatus(
+ "Restoring snapshot '" + snapshotName + "' to directory " + restoreDir);
+ ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher();
+
+ RestoreSnapshotHelper helper = new RestoreSnapshotHelper(conf, fs, snapshotDesc,
+ snapshotDir, htd, restoreDir, monitor, status);
+ helper.restoreHdfsRegions(); // TODO: parallelize.
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Restored table dir:" + restoreDir);
+ FSUtils.logFileSystemState(fs, restoreDir, LOG);
+ }
+ }
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
index 28b4250..b0fd3a5 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
@@ -54,7 +54,7 @@ public abstract class AbstractHBaseTool implements Tool {
protected Configuration conf = null;
private static final Set requiredOptions = new TreeSet();
-
+
protected String[] cmdLineArgs = null;
/**
@@ -151,6 +151,11 @@ public abstract class AbstractHBaseTool implements Tool {
addOptWithArg(opt, description);
}
+ protected void addRequiredOptWithArg(String shortOpt, String longOpt, String description) {
+ requiredOptions.add(longOpt);
+ addOptWithArg(shortOpt, longOpt, description);
+ }
+
protected void addOptNoArg(String opt, String description) {
options.addOption(opt, false, description);
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
index 9758946..369e707 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
@@ -84,6 +84,26 @@ public abstract class ModifyRegionUtils {
public static List createRegions(final Configuration conf, final Path rootDir,
final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
final RegionFillTask task) throws IOException {
+
+ Path tableDir = FSUtils.getTableDir(rootDir, hTableDescriptor.getTableName());
+ return createRegions(conf, rootDir, tableDir, hTableDescriptor, newRegions, task);
+ }
+
+ /**
+ * Create new set of regions on the specified file-system.
+ * NOTE: that you should add the regions to hbase:meta after this operation.
+ *
+ * @param conf {@link Configuration}
+ * @param rootDir Root directory for HBase instance
+ * @param tableDir table directory
+ * @param hTableDescriptor description of the table
+ * @param newRegions {@link HRegionInfo} that describes the regions to create
+ * @param task {@link RegionFillTask} custom code to populate region after creation
+ * @throws IOException
+ */
+ public static List createRegions(final Configuration conf, final Path rootDir,
+ final Path tableDir, final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
+ final RegionFillTask task) throws IOException {
if (newRegions == null) return null;
int regionNumber = newRegions.length;
ThreadPoolExecutor regionOpenAndInitThreadPool = getRegionOpenAndInitThreadPool(conf,
@@ -93,21 +113,9 @@ public abstract class ModifyRegionUtils {
List regionInfos = new ArrayList();
for (final HRegionInfo newRegion : newRegions) {
completionService.submit(new Callable() {
+ @Override
public HRegionInfo call() throws IOException {
- // 1. Create HRegion
- HRegion region = HRegion.createHRegion(newRegion,
- rootDir, conf, hTableDescriptor, null,
- false, true);
- try {
- // 2. Custom user code to interact with the created region
- if (task != null) {
- task.fillRegion(region);
- }
- } finally {
- // 3. Close the new region to flush to disk. Close log file too.
- region.close();
- }
- return region.getRegionInfo();
+ return createRegion(conf, rootDir, tableDir, hTableDescriptor, newRegion, task);
}
});
}
@@ -129,6 +137,35 @@ public abstract class ModifyRegionUtils {
return regionInfos;
}
+ /**
+ * Create new set of regions on the specified file-system.
+ * @param conf {@link Configuration}
+ * @param rootDir Root directory for HBase instance
+ * @param tableDir table directory
+ * @param hTableDescriptor description of the table
+ * @param newRegion {@link HRegionInfo} that describes the region to create
+ * @param task {@link RegionFillTask} custom code to populate region after creation
+ * @throws IOException
+ */
+ public static HRegionInfo createRegion(final Configuration conf, final Path rootDir,
+ final Path tableDir, final HTableDescriptor hTableDescriptor, final HRegionInfo newRegion,
+ final RegionFillTask task) throws IOException {
+ // 1. Create HRegion
+ HRegion region = HRegion.createHRegion(newRegion,
+ rootDir, tableDir, conf, hTableDescriptor, null,
+ false, true);
+ try {
+ // 2. Custom user code to interact with the created region
+ if (task != null) {
+ task.fillRegion(region);
+ }
+ } finally {
+ // 3. Close the new region to flush to disk. Close log file too.
+ region.close();
+ }
+ return region.getRegionInfo();
+ }
+
/*
* used by createRegions() to get the thread pool executor based on the
* "hbase.hregion.open.and.init.threads.max" property.
@@ -142,6 +179,7 @@ public abstract class ModifyRegionUtils {
new ThreadFactory() {
private int count = 1;
+ @Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, threadNamePrefix + "-" + count++);
return t;
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index ce740d5..a637ac0 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -60,7 +60,6 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -81,7 +80,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.security.User;
@@ -165,6 +163,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
* mini dfs.
* @deprecated can be used only with mini dfs
*/
+ @Deprecated
private static final String TEST_DIRECTORY_KEY = "test.build.data";
/** Filesystem URI used for map-reduce mini-cluster setup */
@@ -186,7 +185,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
{ new Boolean(false) },
{ new Boolean(true) }
});
-
+
/** This is for unit tests parameterized with a single boolean. */
public static final List