Index: src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1573061) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -1045,6 +1045,18 @@ * @throws IOException */ public int loadTable(final HTable t, final byte[][] f) throws IOException { + return loadTable(t, f, null); + } + + /** + * Load table of multiple column families with rows from 'aaa' to 'zzz'. + * @param t Table + * @param f Array of Families to load + * @param value the values of the KVs. If null is passed, the row key is used as value + * @return Count of rows loaded. + * @throws IOException + */ + public int loadTable(final HTable t, final byte[][] f, byte[] value) throws IOException { t.setAutoFlush(false); byte[] k = new byte[3]; int rowCount = 0; @@ -1056,7 +1068,7 @@ k[2] = b3; Put put = new Put(k); for (int i = 0; i < f.length; i++) { - put.add(f[i], null, k); + put.add(f[i], null, value != null ? value : k); } t.put(put); rowCount++; @@ -1067,6 +1079,51 @@ return rowCount; } + /** A tracker for tracking and validating table rows + * generated with {@link HBaseTestingUtility#loadTable(HTable, byte[])} + */ + public static class SeenRowTracker { + int dim = 'z' - 'a' + 1; + int[][][] seenRows = new int[dim][dim][dim]; //count of how many times the row is seen + byte[] startRow; + byte[] stopRow; + + public SeenRowTracker(byte[] startRow, byte[] stopRow) { + this.startRow = startRow; + this.stopRow = stopRow; + } + + int i(byte b) { + return b - 'a'; + } + + public void addRow(byte[] row) { + seenRows[i(row[0])][i(row[1])][i(row[2])]++; + } + + /** Validate that all the rows between startRow and stopRow are seen exactly once, and + * all other rows none + */ + public void validate() { + for (byte b1 = 'a'; b1 <= 'z'; b1++) { + for (byte b2 = 'a'; b2 <= 'z'; b2++) { + for (byte b3 = 'a'; b3 <= 'z'; b3++) { + int count = seenRows[i(b1)][i(b2)][i(b3)]; + int expectedCount = 0; + if (Bytes.compareTo(new byte[] {b1,b2,b3}, startRow) >= 0 + && Bytes.compareTo(new byte[] {b1,b2,b3}, stopRow) < 0) { + expectedCount = 1; + } + if (count != expectedCount) { + String row = new String(new byte[] {b1,b2,b3}); + throw new RuntimeException("Row:" + row + " has a seen count of " + count + " instead of " + expectedCount); + } + } + } + } + } + } + /** * Load region with rows from 'aaa' to 'zzz'. * @param r Region Index: src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java (revision 0) @@ -0,0 +1,300 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestTableSnapshotInputFormat { + + private final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final int NUM_REGION_SERVERS = 2; + private static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")}; + public static byte[] bbb = Bytes.toBytes("bbb"); + public static byte[] yyy = Bytes.toBytes("yyy"); + + public void setupCluster() throws Exception { + setupConf(UTIL.getConfiguration()); + UTIL.startMiniCluster(NUM_REGION_SERVERS); + } + + public void tearDownCluster() throws Exception { + UTIL.shutdownMiniCluster(); + } + + private static void setupConf(Configuration conf) { + // Enable snapshot + conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); + } + + @After + public void tearDown() throws Exception { + } + + public static enum TestTableSnapshotCounters { + VALIDATION_ERROR + } + + public static class TestTableSnapshotMapper + extends TableMapper { + @Override + protected void map(ImmutableBytesWritable key, Result value, + Context context) throws IOException, InterruptedException { + // Validate a single row coming from the snapshot, and emit the row key + verifyRowFromMap(key, value); + context.write(key, NullWritable.get()); + } + } + + public static class TestTableSnapshotReducer + extends Reducer { + HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(bbb, yyy); + @Override + protected void reduce(ImmutableBytesWritable key, Iterable values, + Context context) throws IOException, InterruptedException { + rowTracker.addRow(key.get()); + } + + @Override + protected void cleanup(Context context) throws IOException, + InterruptedException { + rowTracker.validate(); + } + } + + public static void createTableAndSnapshot(HBaseTestingUtility util, byte[] tableName, + String snapshotName, int numRegions) + throws Exception { + try { + util.deleteTable(tableName); + } catch(Exception ex) { + // ignore + } + + if (numRegions > 1) { + util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions); + } else { + util.createTable(tableName, FAMILIES); + } + HBaseAdmin admin = util.getHBaseAdmin(); + + // put some stuff in the table + HTable table = new HTable(util.getConfiguration(), tableName); + util.loadTable(table, FAMILIES); + + Path rootDir = new Path(util.getConfiguration().get(HConstants.HBASE_DIR)); + FileSystem fs = rootDir.getFileSystem(util.getConfiguration()); + + SnapshotTestingUtils.createSnapshotAndValidate(admin, Bytes.toString(tableName), + Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true); + + // load different values + byte[] value = Bytes.toBytes("after_snapshot_value"); + util.loadTable(table, FAMILIES, value); + + // cause flush to create new files in the region + admin.flush(tableName); + table.close(); + } + + @Test + public void testWithMockedMapReduceSingleRegion() throws Exception { + testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1); + } + + @Test + public void testWithMockedMapReduceMultiRegion() throws Exception { + testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 8); + } + + public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, int numRegions, int expectedNumSplits) + throws Exception { + setupCluster(); + byte[] tableName = Bytes.toBytes("testWithMockedMapReduce"); + try { + createTableAndSnapshot(util, tableName, snapshotName, numRegions); + + Job job = new Job(util.getConfiguration()); + Path tmpTableDir = util.getDataTestDir(snapshotName); + Scan scan = new Scan(bbb, yyy); // limit the scan + + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, + scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, job, false, tmpTableDir); + + verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, bbb, yyy); + + } finally { + util.getHBaseAdmin().deleteSnapshot(snapshotName); + util.deleteTable(tableName); + tearDownCluster(); + } + } + + private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits, + byte[] startRow, byte[] stopRow) + throws IOException, InterruptedException { + TableSnapshotInputFormat tsif = new TableSnapshotInputFormat(); + List splits = tsif.getSplits(job); + + Assert.assertEquals(expectedNumSplits, splits.size()); + + HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); + + for (int i = 0; i < splits.size(); i++) { + // validate input split + InputSplit split = splits.get(i); + Assert.assertTrue(split instanceof TableSnapshotRegionSplit); + + // validate record reader + TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class); + when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration()); + RecordReader rr = tsif.createRecordReader(split, taskAttemptContext); + rr.initialize(split, taskAttemptContext); + + // validate we can read all the data back + while (rr.nextKeyValue()) { + byte[] row = rr.getCurrentKey().get(); + verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue()); + rowTracker.addRow(row); + } + + rr.close(); + } + + // validate all rows are seen + rowTracker.validate(); + } + + public static void verifyRowFromMap(ImmutableBytesWritable key, Result result) throws IOException { + byte[] row = key.get(); + for (KeyValue kv : result.list()) { + //assert that all Cells in the Result have the same key + Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length, + kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())); + } + + for (int j = 0; j < FAMILIES.length; j++) { + byte[] actual = result.getValue(FAMILIES[j], null); + Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row) + + " ,actual:" + Bytes.toString(actual), row, actual); + } + } + + @Test + public void testWithMapReduceSingleRegion() throws Exception { + testWithMapReduce(UTIL, "testWithMapReduceSingleRegion", 1, 1, false); + } + + @Test + public void testWithMapReduceMultiRegion() throws Exception { + testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 8, false); + } + + @Test + // run the MR job while HBase is offline + public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception { + testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 8, true); + } + + private void testWithMapReduce(HBaseTestingUtility util, String snapshotName, + int numRegions, int expectedNumSplits, boolean shutdownCluster) throws Exception { + setupCluster(); + util.startMiniMapReduceCluster(); + try { + Path tableDir = util.getDataTestDir(snapshotName); + byte[] tableName = Bytes.toBytes("testWithMapReduce"); + doTestWithMapReduce(util, tableName, snapshotName, tableDir, numRegions, + expectedNumSplits, shutdownCluster); + } finally { + util.shutdownMiniMapReduceCluster(); + tearDownCluster(); + } + } + + // this is also called by the IntegrationTestTableSnapshotInputFormat + public static void doTestWithMapReduce(HBaseTestingUtility util, byte[] tableName, + String snapshotName, Path tableDir, int numRegions, int expectedNumSplits, boolean shutdownCluster) + throws Exception { + + //create the table and snapshot + createTableAndSnapshot(util, tableName, snapshotName, numRegions); + + if (shutdownCluster) { + util.shutdownMiniHBaseCluster(); + } + + try { + // create the job + Job job = new Job(util.getConfiguration()); + Scan scan = new Scan(bbb, yyy); // limit the scan + + job.setJarByClass(util.getClass()); + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), TestTableSnapshotInputFormat.class); + + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, + scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, job, true, tableDir); + + job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class); + job.setNumReduceTasks(1); + job.setOutputFormatClass(NullOutputFormat.class); + + Assert.assertTrue(job.waitForCompletion(true)); + } finally { + if (!shutdownCluster) { + util.getHBaseAdmin().deleteSnapshot(snapshotName); + util.deleteTable(tableName); + } + } + } +} Index: src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormatScan.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormatScan.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormatScan.java (revision 0) @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + *

+ * Tests scanning a snapshot. Tests various scan start and stop row scenarios. + * This is set in a scan and tested in a MapReduce job to see if that is handed + * over and done properly too. + *

+ */ +@Category(LargeTests.class) +public class TestTableSnapshotInputFormatScan { + + static final Log LOG = LogFactory.getLog(TestTableSnapshotInputFormatScan.class); + static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + static final byte[] TABLE_NAME = Bytes.toBytes("scantest"); + static final byte[] SNAPSHOT_NAME = Bytes.toBytes("scantest_snaphot"); + static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); + static final String KEY_STARTROW = "startRow"; + static final String KEY_LASTROW = "stpRow"; + + private static HTable table = null; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // config snapshot support + TEST_UTIL.getConfiguration().setBoolean( + SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); + TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6); + TEST_UTIL.getConfiguration().setBoolean( + "hbase.master.enabletable.roundrobin", true); + + // switch TIF to log at DEBUG level + TEST_UTIL.enableDebug(TableSnapshotInputFormat.class); + + // start mini hbase cluster + TEST_UTIL.startMiniCluster(3); + + // create and fill table + table = TEST_UTIL.createTable(TABLE_NAME, INPUT_FAMILY); + TEST_UTIL.createMultiRegions(table, INPUT_FAMILY); + TEST_UTIL.loadTable(table, INPUT_FAMILY); + TEST_UTIL.getHBaseAdmin().disableTable(TABLE_NAME); + TEST_UTIL.getHBaseAdmin().snapshot(SNAPSHOT_NAME, TABLE_NAME); + TEST_UTIL.getHBaseAdmin().enableTable(TABLE_NAME); + + // start MR cluster + TEST_UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniMapReduceCluster(); + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToEmpty() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, null, null); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToAPP() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, "app", "apo"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToBBA() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, "bba", "baz"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToBBB() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, "bbb", "bba"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToOPP() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, "opp", "opo"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + protected void testScan(String start, String stop, String last) + throws IOException, InterruptedException, ClassNotFoundException { + String jobName = "Scan" + (start != null ? start.toUpperCase() : "Empty") + + "To" + (stop != null ? stop.toUpperCase() : "Empty"); + LOG.info("Before map/reduce startup - job " + jobName); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + Scan scan = new Scan(); + scan.addFamily(INPUT_FAMILY); + if (start != null) { + scan.setStartRow(Bytes.toBytes(start)); + } + c.set(KEY_STARTROW, start != null ? start : ""); + if (stop != null) { + scan.setStopRow(Bytes.toBytes(stop)); + } + c.set(KEY_LASTROW, last != null ? last : ""); + LOG.info("scan before: " + scan); + Job job = new Job(c, jobName); + + FileSystem fs = FileSystem.get(c); + Path tmpDir = new Path("/" + UUID.randomUUID()); + fs.mkdirs(tmpDir); + try { + TableMapReduceUtil.initTableSnapshotMapperJob(Bytes.toString(SNAPSHOT_NAME), + scan, TestTableInputFormatScanBase.ScanMapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, + false, tmpDir); + job.setReducerClass(TestTableInputFormatScanBase.ScanReducer.class); + job.setNumReduceTasks(1); // one to get final "first" and "last" key + FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); + LOG.info("Started " + job.getJobName()); + assertTrue(job.waitForCompletion(true)); + LOG.info("After map/reduce completion - job " + jobName); + } finally { + fs.delete(tmpDir, true); + } + + } + +} Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java (revision 0) @@ -0,0 +1,398 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * SnapshotInputFormat allows a MapReduce job to run over a table snapshot. The + * job bypasses HBase servers, and directly accesses the underlying files + * (hfile, recovered edits, hlogs, etc) directly to provide maximum performance. + * The snapshot is not required to be restored or cloned. This also allows to + * run the mapreduce job from an online or offline hbase cluster. The snapshot + * files can be exported by using the ExportSnapshot tool, to a pure-hdfs + * cluster, and this InputFormat can be used to run the mapreduce job directly + * over the snapshot files. + *

+ * Usage is similar to TableInputFormat. + * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job, + * boolean, Path)} can be used to configure the job. + * + *

+ * {
+ *   @code
+ *   Job job = new Job(conf);
+ *   Scan scan = new Scan();
+ *   TableMapReduceUtil.initSnapshotMapperJob(snapshotName, scan,
+ *       MyTableMapper.class, MyMapKeyOutput.class,
+ *       MyMapOutputValueWritable.class, job, true, tmpDir);
+ * }
+ * 
+ *

+ * Internally, this input format restores the snapshot into the given tmp + * directory. Similar to {@link TableInputFormat} an {@link InputSplit} is created per region. + * The region is opened for reading from each RecordReader. An internal + * RegionScanner is used to execute the Scan obtained from the user. + *

+ *

+ * HBase owns all the data and snapshot files on the filesystem. Only the HBase user can read from + * snapshot files and data files. HBase also enforces security because all the requests are handled + * by the server layer, and the user cannot read from the data files directly. + * To read from snapshot files directly from the file system, the user who is running the MR job + * must have sufficient permissions to access snapshot and reference files. + * This means that to run mapreduce over snapshot files, the MR job has to be run as the HBase + * user or the user must have group or other priviledges in the filesystem (See HBASE-8369). + * Note that, given other users access to read from snapshot/data files will completely circumvent + * the access control enforced by HBase. + */ +public final class TableSnapshotInputFormat extends + InputFormat { + // TODO: Snapshots files are owned in fs by the hbase user. There is no + // easy way to delegate access. + + private static final String SNAPSHOT_NAME_KEY = "hbase.mr.snapshot.input.name"; + private static final String TABLE_DIR_KEY = "hbase.mr.snapshot.input.table.dir"; + + /** + * Snapshot region split. + */ + public static final class TableSnapshotRegionSplit extends InputSplit implements + Writable { + private String regionName; + private String[] locations; + + /** + * Constructor for serialization. + */ + public TableSnapshotRegionSplit() { + } + + /** + * Constructor. + * + * @param regionName + * Region name + * @param locationList + * List of nodes with the region's HDFS blocks, in descending order + * of weight + */ + public TableSnapshotRegionSplit(final String regionName, + final List locationList) { + this.regionName = regionName; + + // only use the top node + List list = locationList.size() > 1 ? locationList.subList(0, 1) + : locationList; + this.locations = list.toArray(new String[list.size()]); + } + + @Override + public long getLength() throws IOException, InterruptedException { + return locations.length; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return locations; + } + + @Override + public void readFields(DataInput in) throws IOException { + regionName = Text.readString(in); + int locLength = in.readInt(); + locations = new String[locLength]; + for (int i = 0; i < locLength; i++) { + locations[i] = Text.readString(in); + } + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, regionName); + out.writeInt(locations.length); + for (String l : locations) { + Text.writeString(out, l); + } + } + } + + /** + * Snapshot region record reader. + */ + public static final class TableSnapshotRegionRecordReader extends + RecordReader { + private TableSnapshotRegionSplit split; + private HRegion region; + private Scan scan; + private RegionScanner scanner; + private List values; + private Result result = null; + private ImmutableBytesWritable row = null; + private boolean more; + + @Override + public void initialize(final InputSplit aSplit, + final TaskAttemptContext context) throws IOException, + InterruptedException { + Configuration conf = context.getConfiguration(); + this.split = (TableSnapshotRegionSplit) aSplit; + + Path rootDir = new Path(conf.get(HConstants.HBASE_DIR)); + FileSystem fs = rootDir.getFileSystem(conf); + + String snapshotName = getSnapshotName(conf); + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir( + snapshotName, rootDir); + + // load region descriptor + String regionName = this.split.regionName; + Path regionDir = new Path(snapshotDir, regionName); + HRegionInfo hri = HRegion.loadDotRegionInfoFileContent(fs, regionDir); + + // create scan + scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN)); + // region is immutable, this should be fine, otherwise we have to set the + // thread read point... + scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); + + // load table descriptor + HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, + snapshotDir); + Path tableDir = new Path(conf.get(TABLE_DIR_KEY)); + + // open region from the snapshot directory + this.region = openRegion(tableDir, fs, conf, hri, htd); + + // create region scanner + this.scanner = region.getScanner(scan); + values = new ArrayList(); + this.more = true; + + region.startRegionOperation(); + } + + private HRegion openRegion(final Path tableDir, final FileSystem fs, + final Configuration conf, final HRegionInfo hri, + final HTableDescriptor htd) throws IOException { + HRegion r = HRegion.newHRegion(tableDir, null, fs, conf, hri, htd, null); + r.initialize(null); + return r; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + values.clear(); + // RegionScanner.next() has a different contract than + // RecordReader.nextKeyValue(). Scanner + // indicates no value read by returning empty results. Returns boolean + // indicates if more + // rows exist AFTER this one + if (!more) { + return false; + } + more = scanner.nextRaw(values, scan.getBatch(), null); + if (values == null || values.isEmpty()) { + // we are done + return false; + } + + this.result = new Result(values); + if (this.row == null) { + this.row = new ImmutableBytesWritable(); + } + this.row.set(result.getRow()); + + return true; + } + + @Override + public ImmutableBytesWritable getCurrentKey() throws IOException, + InterruptedException { + return row; + } + + @Override + public Result getCurrentValue() throws IOException, InterruptedException { + return result; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return 0; + } + + @Override + public void close() throws IOException { + try { + if (this.scanner != null) { + this.scanner.close(); + } + } finally { + if (region != null) { + region.closeRegionOperation(); + region.close(true); + } + } + } + } + + @Override + public RecordReader createRecordReader( + final InputSplit split, final TaskAttemptContext context) + throws IOException { + return new TableSnapshotRegionRecordReader(); + } + + @Override + public List getSplits(final JobContext job) throws IOException, + InterruptedException { + Configuration conf = job.getConfiguration(); + String snapshotName = getSnapshotName(job.getConfiguration()); + + Path rootDir = new Path(conf.get(HConstants.HBASE_DIR)); + FileSystem fs = rootDir.getFileSystem(conf); + + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir( + snapshotName, rootDir); + + Set snapshotRegionNames = SnapshotReferenceUtil + .getSnapshotRegionNames(fs, snapshotDir); + if (snapshotRegionNames == null) { + throw new IllegalArgumentException("Snapshot is empty"); + } + + // load table descriptor + HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, + snapshotDir); + + Scan scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN)); + + List splits = new ArrayList( + snapshotRegionNames.size()); + for (String regionName : snapshotRegionNames) { + // load region descriptor + Path regionDir = new Path(snapshotDir, regionName); + HRegionInfo hri = HRegion.loadDotRegionInfoFileContent(fs, regionDir); + + if (keyRangesOverlap(scan.getStartRow(), scan.getStopRow(), + hri.getStartKey(), hri.getEndKey())) { + List hosts = HRegion.computeHDFSBlocksDistribution(conf, htd, + hri.getEncodedName()).getTopHosts(); + splits.add(new TableSnapshotRegionSplit(regionName, hosts)); + } + + } + + return splits; + } + + private boolean keyRangesOverlap(final byte[] start1, final byte[] end1, + final byte[] start2, final byte[] end2) { + return (end2.length == 0 || start1.length == 0 || Bytes.compareTo(start1, + end2) < 0) + && (end1.length == 0 || start2.length == 0 || Bytes.compareTo(start2, + end1) < 0); + } + + /** + * Set job input. + * + * @param job + * The job + * @param snapshotName + * The snapshot name + * @param restoreDir + * The directory where the temp table will be created + * @throws IOException + * on error + */ + public static void setInput(final Job job, final String snapshotName, + final Path restoreDir) throws IOException { + Configuration conf = job.getConfiguration(); + conf.set(SNAPSHOT_NAME_KEY, snapshotName); + + Path rootDir = new Path(conf.get(HConstants.HBASE_DIR)); + FileSystem fs = rootDir.getFileSystem(conf); + + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir( + snapshotName, rootDir); + SnapshotDescription snapshotDesc = SnapshotDescriptionUtils + .readSnapshotInfo(fs, snapshotDir); + + // load table descriptor + HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, + snapshotDir); + + Path tableDir = new Path(restoreDir, htd.getNameAsString()); + conf.set(TABLE_DIR_KEY, tableDir.toString()); + + MonitoredTask status = TaskMonitor.get().createStatus( + "Restoring snapshot '" + snapshotName + "' to directory " + tableDir); + ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(); + + RestoreSnapshotHelper helper = new RestoreSnapshotHelper(conf, fs, + snapshotDesc, snapshotDir, htd, tableDir, monitor, status); + helper.restoreHdfsRegions(); + } + + private static String getSnapshotName(final Configuration conf) { + String snapshotName = conf.get(SNAPSHOT_NAME_KEY); + if (snapshotName == null) { + throw new IllegalArgumentException("Snapshot name must be provided"); + } + return snapshotName; + } +} + Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (revision 1573061) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (working copy) @@ -289,6 +289,55 @@ } } + /** + * Sets up the job for reading from a table snapshot. It bypasses hbase + * servers and read directly from snapshot files. + * + * @param snapshotName + * The name of the snapshot (of a table) to read from. + * @param scan + * The scan instance with the columns, time range etc. + * @param mapper + * The mapper class to use. + * @param outputKeyClass + * The class of the output key. + * @param outputValueClass + * The class of the output value. + * @param job + * The current job to adjust. Make sure the passed job is carrying + * all necessary HBase configuration. + * @param addDependencyJars + * upload HBase jars and jars for any of the configured job classes + * via the distributed cache (tmpjars). + * @param tableRootDir + * The directory where the temp table will be created + * @throws IOException + * When setting up the details fails. + * @see TableSnapshotInputFormat + */ + public static void initTableSnapshotMapperJob(String snapshotName, Scan scan, + Class mapper, Class outputKeyClass, Class outputValueClass, + Job job, boolean addDependencyJars, Path tableRootDir) throws IOException { + + TableSnapshotInputFormat.setInput(job, snapshotName, tableRootDir); + + Configuration conf = job.getConfiguration(); + + job.setInputFormatClass(TableSnapshotInputFormat.class); + if (outputValueClass != null) { + job.setMapOutputValueClass(outputValueClass); + } + if (outputKeyClass != null) { + job.setMapOutputKeyClass(outputKeyClass); + } + job.setMapperClass(mapper); + conf.set(TableInputFormat.SCAN, convertScanToString(scan)); + + if (addDependencyJars) { + TableMapReduceUtil.addDependencyJars(job); + } + } + public static void initCredentials(Job job) throws IOException { UserProvider provider = UserProvider.instantiate(job.getConfiguration());