Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (revision 1575665)
+++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (working copy)
@@ -64,6 +64,7 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.zookeeper.KeeperException;
+import org.cliffc.high_scale_lib.Counter;
/**
* Utility for {@link TableMapper} and {@link TableReducer}
@@ -289,6 +290,57 @@
}
}
+ /**
+ * Sets up the job for reading from a table snapshot. It bypasses hbase
+ * servers and read directly from snapshot files.
+ *
+ * @param snapshotName
+ * The name of the snapshot (of a table) to read from.
+ * @param scan
+ * The scan instance with the columns, time range etc.
+ * @param mapper
+ * The mapper class to use.
+ * @param outputKeyClass
+ * The class of the output key.
+ * @param outputValueClass
+ * The class of the output value.
+ * @param job
+ * The current job to adjust. Make sure the passed job is carrying
+ * all necessary HBase configuration.
+ * @param addDependencyJars
+ * upload HBase jars and jars for any of the configured job classes
+ * via the distributed cache (tmpjars).
+ * @param tableRootDir
+ * The directory where the temp table will be created
+ * @throws IOException
+ * When setting up the details fails.
+ * @see TableSnapshotInputFormat
+ */
+ public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
+ Class extends TableMapper> mapper, Class> outputKeyClass, Class> outputValueClass,
+ Job job, boolean addDependencyJars, Path tableRootDir) throws IOException {
+
+ TableSnapshotInputFormat.setInput(job, snapshotName, tableRootDir);
+
+ Configuration conf = job.getConfiguration();
+
+ job.setInputFormatClass(TableSnapshotInputFormat.class);
+ if (outputValueClass != null) {
+ job.setMapOutputValueClass(outputValueClass);
+ }
+ if (outputKeyClass != null) {
+ job.setMapOutputKeyClass(outputKeyClass);
+ }
+ job.setMapperClass(mapper);
+ conf.set(TableInputFormat.SCAN, convertScanToString(scan));
+
+ if (addDependencyJars) {
+ TableMapReduceUtil.addDependencyJars(job);
+ }
+ // We would need even more libraries that hbase-server depends on
+ TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Counter.class);
+ }
+
public static void initCredentials(Job job) throws IOException {
UserProvider provider = UserProvider.instantiate(job.getConfiguration());
Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java (working copy)
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.IsolationLevel;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The
+ * job bypasses HBase servers, and directly accesses the underlying files
+ * (hfile, recovered edits, hlogs, etc) directly to provide maximum performance.
+ * The snapshot is not required to be restored or cloned. This also allows to
+ * run the mapreduce job from an online or offline hbase cluster. The snapshot
+ * files can be exported by using the ExportSnapshot tool, to a pure-hdfs
+ * cluster, and this InputFormat can be used to run the mapreduce job directly
+ * over the snapshot files.
+ *
+ * Usage is similar to TableInputFormat.
+ * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job,
+ * boolean, Path)} can be used to configure the job.
+ *
+ *
+ * {
+ * @code
+ * Job job = new Job(conf);
+ * Scan scan = new Scan();
+ * TableMapReduceUtil.initSnapshotMapperJob(snapshotName, scan,
+ * MyTableMapper.class, MyMapKeyOutput.class,
+ * MyMapOutputValueWritable.class, job, true, tmpDir);
+ * }
+ *
+ *
+ * Internally, this input format restores the snapshot into the given tmp
+ * directory. Similar to {@link TableInputFormat} an {@link InputSplit} is created per region.
+ * The region is opened for reading from each RecordReader. An internal
+ * RegionScanner is used to execute the Scan obtained from the user.
+ *
+ *
+ * HBase owns all the data and snapshot files on the filesystem. Only the HBase user can read from
+ * snapshot files and data files. HBase also enforces security because all the requests are handled
+ * by the server layer, and the user cannot read from the data files directly.
+ * To read from snapshot files directly from the file system, the user who is running the MR job
+ * must have sufficient permissions to access snapshot and reference files.
+ * This means that to run mapreduce over snapshot files, the MR job has to be run as the HBase
+ * user or the user must have group or other priviledges in the filesystem (See HBASE-8369).
+ * Note that, given other users access to read from snapshot/data files will completely circumvent
+ * the access control enforced by HBase.
+ */
+public final class TableSnapshotInputFormat extends
+ InputFormat {
+ // TODO: Snapshots files are owned in fs by the hbase user. There is no
+ // easy way to delegate access.
+
+ private static final String SNAPSHOT_NAME_KEY = "hbase.mr.snapshot.input.name";
+ private static final String TABLE_DIR_KEY = "hbase.mr.snapshot.input.table.dir";
+
+ /**
+ * Snapshot region split.
+ */
+ public static final class TableSnapshotRegionSplit extends InputSplit implements
+ Writable {
+ private String regionName;
+ private String[] locations;
+
+ /**
+ * Constructor for serialization.
+ */
+ public TableSnapshotRegionSplit() {
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param regionName
+ * Region name
+ * @param locationList
+ * List of nodes with the region's HDFS blocks, in descending order
+ * of weight
+ */
+ public TableSnapshotRegionSplit(final String regionName,
+ final List locationList) {
+ this.regionName = regionName;
+
+ // only use the top node
+ List list = locationList.size() > 1 ? locationList.subList(0, 1)
+ : locationList;
+ this.locations = list.toArray(new String[list.size()]);
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ return locations.length;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return locations;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ regionName = Text.readString(in);
+ int locLength = in.readInt();
+ locations = new String[locLength];
+ for (int i = 0; i < locLength; i++) {
+ locations[i] = Text.readString(in);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, regionName);
+ out.writeInt(locations.length);
+ for (String l : locations) {
+ Text.writeString(out, l);
+ }
+ }
+ }
+
+ /**
+ * Snapshot region record reader.
+ */
+ public static final class TableSnapshotRegionRecordReader extends
+ RecordReader {
+ private TableSnapshotRegionSplit split;
+ private HRegion region;
+ private Scan scan;
+ private RegionScanner scanner;
+ private List values;
+ private Result result = null;
+ private ImmutableBytesWritable row = null;
+ private boolean more;
+
+ @Override
+ public void initialize(final InputSplit aSplit,
+ final TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ Configuration conf = context.getConfiguration();
+ this.split = (TableSnapshotRegionSplit) aSplit;
+
+ Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
+ FileSystem fs = rootDir.getFileSystem(conf);
+
+ String snapshotName = getSnapshotName(conf);
+ Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(
+ snapshotName, rootDir);
+
+ // load region descriptor
+ String regionName = this.split.regionName;
+ Path regionDir = new Path(snapshotDir, regionName);
+ HRegionInfo hri = HRegion.loadDotRegionInfoFileContent(fs, regionDir);
+
+ // create scan
+ scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
+ // region is immutable, this should be fine, otherwise we have to set the
+ // thread read point...
+ scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
+ scan.setCacheBlocks(false);
+
+ // load table descriptor
+ HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs,
+ snapshotDir);
+ Path tableDir = new Path(conf.get(TABLE_DIR_KEY));
+
+ // open region from the snapshot directory
+ this.region = openRegion(tableDir, fs, conf, hri, htd);
+
+ // create region scanner
+ this.scanner = region.getScanner(scan);
+ values = new ArrayList();
+ this.more = true;
+
+ region.startRegionOperation();
+ }
+
+ private HRegion openRegion(final Path tableDir, final FileSystem fs,
+ final Configuration conf, final HRegionInfo hri,
+ final HTableDescriptor htd) throws IOException {
+ HRegion r = HRegion.newHRegion(tableDir, null, fs, conf, hri, htd, null);
+ r.initialize(null);
+ return r;
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ values.clear();
+ // RegionScanner.next() has a different contract than
+ // RecordReader.nextKeyValue(). Scanner
+ // indicates no value read by returning empty results. Returns boolean
+ // indicates if more
+ // rows exist AFTER this one
+ if (!more) {
+ return false;
+ }
+ more = scanner.nextRaw(values, scan.getBatch(), null);
+ if (values == null || values.isEmpty()) {
+ // we are done
+ return false;
+ }
+
+ this.result = new Result(values);
+ if (this.row == null) {
+ this.row = new ImmutableBytesWritable();
+ }
+ this.row.set(result.getRow());
+
+ return true;
+ }
+
+ @Override
+ public ImmutableBytesWritable getCurrentKey() throws IOException,
+ InterruptedException {
+ return row;
+ }
+
+ @Override
+ public Result getCurrentValue() throws IOException, InterruptedException {
+ return result;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (this.scanner != null) {
+ this.scanner.close();
+ }
+ } finally {
+ if (region != null) {
+ region.closeRegionOperation();
+ region.close(true);
+ }
+ }
+ }
+ }
+
+ @Override
+ public RecordReader createRecordReader(
+ final InputSplit split, final TaskAttemptContext context)
+ throws IOException {
+ return new TableSnapshotRegionRecordReader();
+ }
+
+ @Override
+ public List getSplits(final JobContext job) throws IOException,
+ InterruptedException {
+ Configuration conf = job.getConfiguration();
+ String snapshotName = getSnapshotName(job.getConfiguration());
+
+ Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
+ FileSystem fs = rootDir.getFileSystem(conf);
+
+ Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(
+ snapshotName, rootDir);
+
+ Set snapshotRegionNames = SnapshotReferenceUtil
+ .getSnapshotRegionNames(fs, snapshotDir);
+ if (snapshotRegionNames == null) {
+ throw new IllegalArgumentException("Snapshot is empty");
+ }
+
+ // load table descriptor
+ HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs,
+ snapshotDir);
+
+ Scan scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
+
+ List splits = new ArrayList(
+ snapshotRegionNames.size());
+ for (String regionName : snapshotRegionNames) {
+ // load region descriptor
+ Path regionDir = new Path(snapshotDir, regionName);
+ HRegionInfo hri = HRegion.loadDotRegionInfoFileContent(fs, regionDir);
+
+ if (keyRangesOverlap(scan.getStartRow(), scan.getStopRow(),
+ hri.getStartKey(), hri.getEndKey())) {
+ List hosts = HRegion.computeHDFSBlocksDistribution(conf, htd,
+ hri.getEncodedName()).getTopHosts();
+ splits.add(new TableSnapshotRegionSplit(regionName, hosts));
+ }
+
+ }
+
+ return splits;
+ }
+
+ private boolean keyRangesOverlap(final byte[] start1, final byte[] end1,
+ final byte[] start2, final byte[] end2) {
+ return (end2.length == 0 || start1.length == 0 || Bytes.compareTo(start1,
+ end2) < 0)
+ && (end1.length == 0 || start2.length == 0 || Bytes.compareTo(start2,
+ end1) < 0);
+ }
+
+ /**
+ * Set job input.
+ *
+ * @param job
+ * The job
+ * @param snapshotName
+ * The snapshot name
+ * @param restoreDir
+ * The directory where the temp table will be created
+ * @throws IOException
+ * on error
+ */
+ public static void setInput(final Job job, final String snapshotName,
+ final Path restoreDir) throws IOException {
+ Configuration conf = job.getConfiguration();
+ conf.set(SNAPSHOT_NAME_KEY, snapshotName);
+
+ Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
+ FileSystem fs = rootDir.getFileSystem(conf);
+
+ Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(
+ snapshotName, rootDir);
+ SnapshotDescription snapshotDesc = SnapshotDescriptionUtils
+ .readSnapshotInfo(fs, snapshotDir);
+
+ // load table descriptor
+ HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs,
+ snapshotDir);
+
+ Path tableDir = new Path(restoreDir, htd.getNameAsString());
+ conf.set(TABLE_DIR_KEY, tableDir.toString());
+
+ MonitoredTask status = TaskMonitor.get().createStatus(
+ "Restoring snapshot '" + snapshotName + "' to directory " + tableDir);
+ ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher();
+
+ RestoreSnapshotHelper helper = new RestoreSnapshotHelper(conf, fs,
+ snapshotDesc, snapshotDir, htd, tableDir, monitor, status);
+ helper.restoreHdfsRegions();
+ }
+
+ private static String getSnapshotName(final Configuration conf) {
+ String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
+ if (snapshotName == null) {
+ throw new IllegalArgumentException("Snapshot name must be provided");
+ }
+ return snapshotName;
+ }
+}
+
Index: src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1575665)
+++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy)
@@ -1045,6 +1045,18 @@
* @throws IOException
*/
public int loadTable(final HTable t, final byte[][] f) throws IOException {
+ return loadTable(t, f, null);
+ }
+
+ /**
+ * Load table of multiple column families with rows from 'aaa' to 'zzz'.
+ * @param t Table
+ * @param f Array of Families to load
+ * @param value the values of the KVs. If null is passed, the row key is used as value
+ * @return Count of rows loaded.
+ * @throws IOException
+ */
+ public int loadTable(final HTable t, final byte[][] f, byte[] value) throws IOException {
t.setAutoFlush(false);
byte[] k = new byte[3];
int rowCount = 0;
@@ -1056,7 +1068,7 @@
k[2] = b3;
Put put = new Put(k);
for (int i = 0; i < f.length; i++) {
- put.add(f[i], null, k);
+ put.add(f[i], null, value != null ? value : k);
}
t.put(put);
rowCount++;
@@ -1067,6 +1079,51 @@
return rowCount;
}
+ /** A tracker for tracking and validating table rows
+ * generated with {@link HBaseTestingUtility#loadTable(HTable, byte[])}
+ */
+ public static class SeenRowTracker {
+ int dim = 'z' - 'a' + 1;
+ int[][][] seenRows = new int[dim][dim][dim]; //count of how many times the row is seen
+ byte[] startRow;
+ byte[] stopRow;
+
+ public SeenRowTracker(byte[] startRow, byte[] stopRow) {
+ this.startRow = startRow;
+ this.stopRow = stopRow;
+ }
+
+ int i(byte b) {
+ return b - 'a';
+ }
+
+ public void addRow(byte[] row) {
+ seenRows[i(row[0])][i(row[1])][i(row[2])]++;
+ }
+
+ /** Validate that all the rows between startRow and stopRow are seen exactly once, and
+ * all other rows none
+ */
+ public void validate() {
+ for (byte b1 = 'a'; b1 <= 'z'; b1++) {
+ for (byte b2 = 'a'; b2 <= 'z'; b2++) {
+ for (byte b3 = 'a'; b3 <= 'z'; b3++) {
+ int count = seenRows[i(b1)][i(b2)][i(b3)];
+ int expectedCount = 0;
+ if (Bytes.compareTo(new byte[] {b1,b2,b3}, startRow) >= 0
+ && Bytes.compareTo(new byte[] {b1,b2,b3}, stopRow) < 0) {
+ expectedCount = 1;
+ }
+ if (count != expectedCount) {
+ String row = new String(new byte[] {b1,b2,b3});
+ throw new RuntimeException("Row:" + row + " has a seen count of " + count + " instead of " + expectedCount);
+ }
+ }
+ }
+ }
+ }
+ }
+
/**
* Load region with rows from 'aaa' to 'zzz'.
* @param r Region
Index: src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java (working copy)
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
+import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestTableSnapshotInputFormat {
+
+ private final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private static final int NUM_REGION_SERVERS = 2;
+ private static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")};
+ public static byte[] bbb = Bytes.toBytes("bbb");
+ public static byte[] yyy = Bytes.toBytes("yyy");
+
+ public void setupCluster() throws Exception {
+ setupConf(UTIL.getConfiguration());
+ UTIL.startMiniCluster(NUM_REGION_SERVERS);
+ }
+
+ public void tearDownCluster() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ private static void setupConf(Configuration conf) {
+ // Enable snapshot
+ conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ public static enum TestTableSnapshotCounters {
+ VALIDATION_ERROR
+ }
+
+ public static class TestTableSnapshotMapper
+ extends TableMapper {
+ @Override
+ protected void map(ImmutableBytesWritable key, Result value,
+ Context context) throws IOException, InterruptedException {
+ // Validate a single row coming from the snapshot, and emit the row key
+ verifyRowFromMap(key, value);
+ context.write(key, NullWritable.get());
+ }
+ }
+
+ public static class TestTableSnapshotReducer
+ extends Reducer {
+ HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(bbb, yyy);
+ @Override
+ protected void reduce(ImmutableBytesWritable key, Iterable values,
+ Context context) throws IOException, InterruptedException {
+ rowTracker.addRow(key.get());
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException,
+ InterruptedException {
+ rowTracker.validate();
+ }
+ }
+
+ public static void createTableAndSnapshot(HBaseTestingUtility util, byte[] tableName,
+ String snapshotName, int numRegions)
+ throws Exception {
+ try {
+ util.deleteTable(tableName);
+ } catch(Exception ex) {
+ // ignore
+ }
+
+ if (numRegions > 1) {
+ util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions);
+ } else {
+ util.createTable(tableName, FAMILIES);
+ }
+ HBaseAdmin admin = util.getHBaseAdmin();
+
+ // put some stuff in the table
+ HTable table = new HTable(util.getConfiguration(), tableName);
+ util.loadTable(table, FAMILIES);
+
+ Path rootDir = new Path(util.getConfiguration().get(HConstants.HBASE_DIR));
+ FileSystem fs = rootDir.getFileSystem(util.getConfiguration());
+
+ SnapshotTestingUtils.createSnapshotAndValidate(admin, Bytes.toString(tableName),
+ Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
+
+ // load different values
+ byte[] value = Bytes.toBytes("after_snapshot_value");
+ util.loadTable(table, FAMILIES, value);
+
+ // cause flush to create new files in the region
+ admin.flush(tableName);
+ table.close();
+ }
+
+ @Test
+ public void testWithMockedMapReduceSingleRegion() throws Exception {
+ testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1);
+ }
+
+ @Test
+ public void testWithMockedMapReduceMultiRegion() throws Exception {
+ testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 8);
+ }
+
+ public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, int numRegions, int expectedNumSplits)
+ throws Exception {
+ setupCluster();
+ byte[] tableName = Bytes.toBytes("testWithMockedMapReduce");
+ try {
+ createTableAndSnapshot(util, tableName, snapshotName, numRegions);
+
+ Job job = new Job(util.getConfiguration());
+ Path tmpTableDir = util.getDataTestDir(snapshotName);
+ Scan scan = new Scan(bbb, yyy); // limit the scan
+
+ TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+ scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+ NullWritable.class, job, false, tmpTableDir);
+
+ verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, bbb, yyy);
+
+ } finally {
+ util.getHBaseAdmin().deleteSnapshot(snapshotName);
+ util.deleteTable(tableName);
+ tearDownCluster();
+ }
+ }
+
+ private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits,
+ byte[] startRow, byte[] stopRow)
+ throws IOException, InterruptedException {
+ TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
+ List splits = tsif.getSplits(job);
+
+ Assert.assertEquals(expectedNumSplits, splits.size());
+
+ HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
+
+ for (int i = 0; i < splits.size(); i++) {
+ // validate input split
+ InputSplit split = splits.get(i);
+ Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
+
+ // validate record reader
+ TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
+ when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
+ RecordReader rr = tsif.createRecordReader(split, taskAttemptContext);
+ rr.initialize(split, taskAttemptContext);
+
+ // validate we can read all the data back
+ while (rr.nextKeyValue()) {
+ byte[] row = rr.getCurrentKey().get();
+ verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue());
+ rowTracker.addRow(row);
+ }
+
+ rr.close();
+ }
+
+ // validate all rows are seen
+ rowTracker.validate();
+ }
+
+ public static void verifyRowFromMap(ImmutableBytesWritable key, Result result) throws IOException {
+ byte[] row = key.get();
+ for (KeyValue kv : result.list()) {
+ //assert that all Cells in the Result have the same key
+ Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length,
+ kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()));
+ }
+
+ for (int j = 0; j < FAMILIES.length; j++) {
+ byte[] actual = result.getValue(FAMILIES[j], null);
+ Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
+ + " ,actual:" + Bytes.toString(actual), row, actual);
+ }
+ }
+
+ @Test
+ public void testWithMapReduceSingleRegion() throws Exception {
+ testWithMapReduce(UTIL, "testWithMapReduceSingleRegion", 1, 1, false);
+ }
+
+ @Test
+ public void testWithMapReduceMultiRegion() throws Exception {
+ testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 8, false);
+ }
+
+ @Test
+ // run the MR job while HBase is offline
+ public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception {
+ testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 8, true);
+ }
+
+ private void testWithMapReduce(HBaseTestingUtility util, String snapshotName,
+ int numRegions, int expectedNumSplits, boolean shutdownCluster) throws Exception {
+ setupCluster();
+ util.startMiniMapReduceCluster();
+ try {
+ Path tableDir = util.getDataTestDir(snapshotName);
+ byte[] tableName = Bytes.toBytes("testWithMapReduce");
+ doTestWithMapReduce(util, tableName, snapshotName, tableDir, numRegions,
+ expectedNumSplits, shutdownCluster);
+ } finally {
+ util.shutdownMiniMapReduceCluster();
+ tearDownCluster();
+ }
+ }
+
+ // this is also called by the IntegrationTestTableSnapshotInputFormat
+ public static void doTestWithMapReduce(HBaseTestingUtility util, byte[] tableName,
+ String snapshotName, Path tableDir, int numRegions, int expectedNumSplits, boolean shutdownCluster)
+ throws Exception {
+
+ //create the table and snapshot
+ createTableAndSnapshot(util, tableName, snapshotName, numRegions);
+
+ if (shutdownCluster) {
+ util.shutdownMiniHBaseCluster();
+ }
+
+ try {
+ // create the job
+ Job job = new Job(util.getConfiguration());
+ Scan scan = new Scan(bbb, yyy); // limit the scan
+
+ job.setJarByClass(util.getClass());
+ TableMapReduceUtil.addDependencyJars(job.getConfiguration(), TestTableSnapshotInputFormat.class);
+
+ TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+ scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+ NullWritable.class, job, true, tableDir);
+
+ job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
+ job.setNumReduceTasks(1);
+ job.setOutputFormatClass(NullOutputFormat.class);
+
+ Assert.assertTrue(job.waitForCompletion(true));
+ } finally {
+ if (!shutdownCluster) {
+ util.getHBaseAdmin().deleteSnapshot(snapshotName);
+ util.deleteTable(tableName);
+ }
+ }
+ }
+}
Index: src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormatScan.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormatScan.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormatScan.java (working copy)
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ *
+ * Tests scanning a snapshot. Tests various scan start and stop row scenarios.
+ * This is set in a scan and tested in a MapReduce job to see if that is handed
+ * over and done properly too.
+ *
+ */
+@Category(LargeTests.class)
+public class TestTableSnapshotInputFormatScan {
+
+ static final Log LOG = LogFactory.getLog(TestTableSnapshotInputFormatScan.class);
+ static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ static final byte[] TABLE_NAME = Bytes.toBytes("scantest");
+ static final byte[] SNAPSHOT_NAME = Bytes.toBytes("scantest_snaphot");
+ static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
+ static final String KEY_STARTROW = "startRow";
+ static final String KEY_LASTROW = "stpRow";
+
+ private static HTable table = null;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // config snapshot support
+ TEST_UTIL.getConfiguration().setBoolean(
+ SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+ TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
+ TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
+ TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6);
+ TEST_UTIL.getConfiguration().setBoolean(
+ "hbase.master.enabletable.roundrobin", true);
+
+ // switch TIF to log at DEBUG level
+ TEST_UTIL.enableDebug(TableSnapshotInputFormat.class);
+
+ // start mini hbase cluster
+ TEST_UTIL.startMiniCluster(3);
+
+ // create and fill table
+ table = TEST_UTIL.createTable(TABLE_NAME, INPUT_FAMILY);
+ TEST_UTIL.createMultiRegions(table, INPUT_FAMILY);
+ TEST_UTIL.loadTable(table, INPUT_FAMILY);
+ TEST_UTIL.getHBaseAdmin().disableTable(TABLE_NAME);
+ TEST_UTIL.getHBaseAdmin().snapshot(SNAPSHOT_NAME, TABLE_NAME);
+ TEST_UTIL.getHBaseAdmin().enableTable(TABLE_NAME);
+
+ // start MR cluster
+ TEST_UTIL.startMiniMapReduceCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniMapReduceCluster();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanEmptyToEmpty() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ testScan(null, null, null);
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanEmptyToAPP() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ testScan(null, "app", "apo");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanEmptyToBBA() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ testScan(null, "bba", "baz");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanEmptyToBBB() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ testScan(null, "bbb", "bba");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanEmptyToOPP() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ testScan(null, "opp", "opo");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ protected void testScan(String start, String stop, String last)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ String jobName = "Scan" + (start != null ? start.toUpperCase() : "Empty")
+ + "To" + (stop != null ? stop.toUpperCase() : "Empty");
+ LOG.info("Before map/reduce startup - job " + jobName);
+ Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+ Scan scan = new Scan();
+ scan.addFamily(INPUT_FAMILY);
+ if (start != null) {
+ scan.setStartRow(Bytes.toBytes(start));
+ }
+ c.set(KEY_STARTROW, start != null ? start : "");
+ if (stop != null) {
+ scan.setStopRow(Bytes.toBytes(stop));
+ }
+ c.set(KEY_LASTROW, last != null ? last : "");
+ LOG.info("scan before: " + scan);
+ Job job = new Job(c, jobName);
+
+ FileSystem fs = FileSystem.get(c);
+ Path tmpDir = new Path("/" + UUID.randomUUID());
+ fs.mkdirs(tmpDir);
+ try {
+ TableMapReduceUtil.initTableSnapshotMapperJob(Bytes.toString(SNAPSHOT_NAME),
+ scan, TestTableInputFormatScanBase.ScanMapper.class,
+ ImmutableBytesWritable.class, ImmutableBytesWritable.class, job,
+ false, tmpDir);
+ job.setReducerClass(TestTableInputFormatScanBase.ScanReducer.class);
+ job.setNumReduceTasks(1); // one to get final "first" and "last" key
+ FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
+ LOG.info("Started " + job.getJobName());
+ assertTrue(job.waitForCompletion(true));
+ LOG.info("After map/reduce completion - job " + jobName);
+ } finally {
+ fs.delete(tmpDir, true);
+ }
+
+ }
+
+}