Index: src/test/java/org/apache/hadoop/hbase/mapreduce/TestSnapshotInputFormatScan.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestSnapshotInputFormatScan.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestSnapshotInputFormatScan.java (revision 0) @@ -0,0 +1,212 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + *

+ * Tests scanning a snapshot. Tests various scan start and stop row scenarios. + * This is set in a scan and tested in a MapReduce job to see if that is handed + * over and done properly too. + *

+ */ +@Category(LargeTests.class) +public class TestSnapshotInputFormatScan { + + static final Log LOG = LogFactory.getLog(TestSnapshotInputFormatScan.class); + static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + static final byte[] TABLE_NAME = Bytes.toBytes("scantest"); + static final byte[] SNAPSHOT_NAME = Bytes.toBytes("scantest_snaphot"); + static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); + static final String KEY_STARTROW = "startRow"; + static final String KEY_LASTROW = "stpRow"; + + private static HTable table = null; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // config snapshot support + TEST_UTIL.getConfiguration().setBoolean( + SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); + TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6); + TEST_UTIL.getConfiguration().setBoolean( + "hbase.master.enabletable.roundrobin", true); + + // switch TIF to log at DEBUG level + TEST_UTIL.enableDebug(SnapshotInputFormat.class); + + // start mini hbase cluster + TEST_UTIL.startMiniCluster(3); + + // create and fill table + table = TEST_UTIL.createTable(TABLE_NAME, INPUT_FAMILY); + TEST_UTIL.createMultiRegions(table, INPUT_FAMILY); + TEST_UTIL.loadTable(table, INPUT_FAMILY); + TEST_UTIL.getHBaseAdmin().disableTable(TABLE_NAME); + TEST_UTIL.getHBaseAdmin().snapshot(SNAPSHOT_NAME, TABLE_NAME); + TEST_UTIL.getHBaseAdmin().enableTable(TABLE_NAME); + + // start MR cluster + TEST_UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniMapReduceCluster(); + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToEmpty() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, null, null); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToAPP() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, "app", "apo"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToBBA() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, "bba", "baz"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToBBB() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, "bbb", "bba"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToOPP() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, "opp", "opo"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + protected void testScan(String start, String stop, String last) + throws IOException, InterruptedException, ClassNotFoundException { + String jobName = "Scan" + (start != null ? start.toUpperCase() : "Empty") + + "To" + (stop != null ? stop.toUpperCase() : "Empty"); + LOG.info("Before map/reduce startup - job " + jobName); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + Scan scan = new Scan(); + scan.addFamily(INPUT_FAMILY); + if (start != null) { + scan.setStartRow(Bytes.toBytes(start)); + } + c.set(KEY_STARTROW, start != null ? start : ""); + if (stop != null) { + scan.setStopRow(Bytes.toBytes(stop)); + } + c.set(KEY_LASTROW, last != null ? last : ""); + LOG.info("scan before: " + scan); + Job job = new Job(c, jobName); + + FileSystem fs = FileSystem.get(c); + Path tmpDir = new Path("/" + UUID.randomUUID()); + fs.mkdirs(tmpDir); + try { + TableMapReduceUtil.initSnapshotMapperJob(Bytes.toString(SNAPSHOT_NAME), + tmpDir, scan, TestTableInputFormatScanBase.ScanMapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, + false); + job.setReducerClass(TestTableInputFormatScanBase.ScanReducer.class); + job.setNumReduceTasks(1); // one to get final "first" and "last" key + FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); + LOG.info("Started " + job.getJobName()); + assertTrue(job.waitForCompletion(true)); + LOG.info("After map/reduce completion - job " + jobName); + } finally { + fs.delete(tmpDir, true); + } + + } + +} Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (revision 1573061) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (working copy) @@ -289,6 +289,55 @@ } } + /** + * Sets up the job for reading from a table snapshot. It bypasses hbase + * servers and read directly from snapshot files. + * + * @param snapshotName + * The name of the snapshot (of a table) to read from. + * @param tableRootDir + * The directory where the temp table will be created + * @param scan + * The scan instance with the columns, time range etc. + * @param mapper + * The mapper class to use. + * @param outputKeyClass + * The class of the output key. + * @param outputValueClass + * The class of the output value. + * @param job + * The current job to adjust. Make sure the passed job is carrying + * all necessary HBase configuration. + * @param addDependencyJars + * upload HBase jars and jars for any of the configured job classes + * via the distributed cache (tmpjars). + * @throws IOException + * When setting up the details fails. + * @see SnapshotInputFormat + */ + public static void initSnapshotMapperJob(String snapshotName, Path tableRootDir, Scan scan, + Class mapper, Class outputKeyClass, Class outputValueClass, + Job job, boolean addDependencyJars) throws IOException { + + SnapshotInputFormat.setInput(job, snapshotName, tableRootDir); + + Configuration conf = job.getConfiguration(); + + job.setInputFormatClass(SnapshotInputFormat.class); + if (outputValueClass != null) { + job.setMapOutputValueClass(outputValueClass); + } + if (outputKeyClass != null) { + job.setMapOutputKeyClass(outputKeyClass); + } + job.setMapperClass(mapper); + conf.set(TableInputFormat.SCAN, convertScanToString(scan)); + + if (addDependencyJars) { + TableMapReduceUtil.addDependencyJars(job); + } + } + public static void initCredentials(Job job) throws IOException { UserProvider provider = UserProvider.instantiate(job.getConfiguration()); Index: src/main/java/org/apache/hadoop/hbase/mapreduce/SnapshotInputFormat.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/SnapshotInputFormat.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/SnapshotInputFormat.java (revision 0) @@ -0,0 +1,390 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * SnapshotInputFormat allows a MapReduce job to run over a table snapshot. The + * job bypasses HBase servers, and directly accesses the underlying files + * (hfile, recovered edits, hlogs, etc) directly to provide maximum performance. + * The snapshot is not required to be restored or cloned. This also allows to + * run the mapreduce job from an online or offline hbase cluster. The snapshot + * files can be exported by using the ExportSnapshot tool, to a pure-hdfs + * cluster, and this InputFormat can be used to run the mapreduce job directly + * over the snapshot files. + *

+ * Usage is similar to TableInputFormat. + * SnapshotInputFormat.initSnapshotMapperJob(String, Scan, Class, Class, Class, + * Job, boolean)} can be used to configure the job. + * + *

+ * {
+ *   @code
+ *   Job job = new Job(conf);
+ *   Scan scan = new Scan();
+ *   SnapshotInputFormat.initSnapshotMapperJob(snapshotName, scan,
+ *       MyTableMapper.class, MyMapKeyOutput.class,
+ *       MyMapOutputValueWritable.class, job, true);
+ * }
+ * 
+ *

+ * Internally, this input format restores the snapshot into the working + * directory. Similar to TableInputFormat an InputSplit is created per region. + * The region is opened for reading from each RecordReader. An internal + * RegionScanner is used to execute the Scan obtained from the user. + *

+ * The user has to have sufficient access rights in the file system to access + * the snapshot files, and referenced files. + */ +public final class SnapshotInputFormat extends + InputFormat { + // TODO: Snapshots files are owned in fs by the hbase user. There is no + // easy way to delegate access. + + private static final String SNAPSHOT_NAME_KEY = "hbase.mr.snapshot.input.name"; + private static final String TABLE_DIR_KEY = "hbase.mr.snapshot.input.table.dir"; + + /** + * Snapshot region split. + */ + public static final class SnapshotRegionSplit extends InputSplit implements + Writable { + private String regionName; + private String[] locations; + + /** + * Constructor for serialization. + */ + public SnapshotRegionSplit() { + } + + /** + * Constructor. + * + * @param regionName + * Region name + * @param locationList + * List of nodes with the region's HDFS blocks, in descending order + * of weight + */ + public SnapshotRegionSplit(final String regionName, + final List locationList) { + this.regionName = regionName; + + // only use the top node + List list = locationList.size() > 1 ? locationList.subList(0, 1) + : locationList; + this.locations = list.toArray(new String[list.size()]); + } + + @Override + public long getLength() throws IOException, InterruptedException { + return locations.length; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return locations; + } + + @Override + public void readFields(DataInput in) throws IOException { + regionName = Text.readString(in); + int locLength = in.readInt(); + locations = new String[locLength]; + for (int i = 0; i < locLength; i++) { + locations[i] = Text.readString(in); + } + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, regionName); + out.writeInt(locations.length); + for (String l : locations) { + Text.writeString(out, l); + } + } + } + + /** + * Snapshot region record reader. + */ + public static final class SnapshotRegionRecordReader extends + RecordReader { + private SnapshotRegionSplit split; + private HRegion region; + private Scan scan; + private RegionScanner scanner; + private List values; + private Result result = null; + private ImmutableBytesWritable row = null; + private boolean more; + + @Override + public void initialize(final InputSplit aSplit, + final TaskAttemptContext context) throws IOException, + InterruptedException { + Configuration conf = context.getConfiguration(); + this.split = (SnapshotRegionSplit) aSplit; + + Path rootDir = new Path(conf.get(HConstants.HBASE_DIR)); + FileSystem fs = rootDir.getFileSystem(conf); + + String snapshotName = getSnapshotName(conf); + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir( + snapshotName, rootDir); + + // load region descriptor + String regionName = this.split.regionName; + Path regionDir = new Path(snapshotDir, regionName); + HRegionInfo hri = HRegion.loadDotRegionInfoFileContent(fs, regionDir); + + // create scan + scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN)); + // region is immutable, this should be fine, otherwise we have to set the + // thread read point... + scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); + + // load table descriptor + HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, + snapshotDir); + Path tableDir = new Path(conf.get(TABLE_DIR_KEY)); + + // open region from the snapshot directory + this.region = openRegion(tableDir, fs, conf, hri, htd); + + // create region scanner + this.scanner = region.getScanner(scan); + values = new ArrayList(); + this.more = true; + + region.startRegionOperation(); + } + + private HRegion openRegion(final Path tableDir, final FileSystem fs, + final Configuration conf, final HRegionInfo hri, + final HTableDescriptor htd) throws IOException { + HRegion r = HRegion.newHRegion(tableDir, null, fs, conf, hri, htd, null); + r.initialize(null); + return r; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + values.clear(); + // RegionScanner.next() has a different contract than + // RecordReader.nextKeyValue(). Scanner + // indicates no value read by returning empty results. Returns boolean + // indicates if more + // rows exist AFTER this one + if (!more) { + return false; + } + more = scanner.nextRaw(values, scan.getBatch(), null); + if (values == null || values.isEmpty()) { + // we are done + return false; + } + + this.result = new Result(values); + if (this.row == null) { + this.row = new ImmutableBytesWritable(); + } + this.row.set(result.getRow()); + + return true; + } + + @Override + public ImmutableBytesWritable getCurrentKey() throws IOException, + InterruptedException { + return row; + } + + @Override + public Result getCurrentValue() throws IOException, InterruptedException { + return result; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return 0; + } + + @Override + public void close() throws IOException { + try { + if (this.scanner != null) { + this.scanner.close(); + } + } finally { + if (region != null) { + region.closeRegionOperation(); + region.close(true); + } + } + } + } + + @Override + public RecordReader createRecordReader( + final InputSplit split, final TaskAttemptContext context) + throws IOException { + return new SnapshotRegionRecordReader(); + } + + @Override + public List getSplits(final JobContext job) throws IOException, + InterruptedException { + Configuration conf = job.getConfiguration(); + String snapshotName = getSnapshotName(job.getConfiguration()); + + Path rootDir = new Path(conf.get(HConstants.HBASE_DIR)); + FileSystem fs = rootDir.getFileSystem(conf); + + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir( + snapshotName, rootDir); + + Set snapshotRegionNames = SnapshotReferenceUtil + .getSnapshotRegionNames(fs, snapshotDir); + if (snapshotRegionNames == null) { + throw new IllegalArgumentException("Snapshot is empty"); + } + + // load table descriptor + HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, + snapshotDir); + + Scan scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN)); + + List splits = new ArrayList( + snapshotRegionNames.size()); + for (String regionName : snapshotRegionNames) { + // load region descriptor + Path regionDir = new Path(snapshotDir, regionName); + HRegionInfo hri = HRegion.loadDotRegionInfoFileContent(fs, regionDir); + + if (keyRangesOverlap(scan.getStartRow(), scan.getStopRow(), + hri.getStartKey(), hri.getEndKey())) { + List hosts = HRegion.computeHDFSBlocksDistribution(conf, htd, + hri.getEncodedName()).getTopHosts(); + splits.add(new SnapshotRegionSplit(regionName, hosts)); + } + + } + + return splits; + } + + private boolean keyRangesOverlap(final byte[] start1, final byte[] end1, + final byte[] start2, final byte[] end2) { + return (end2.length == 0 || start1.length == 0 || Bytes.compareTo(start1, + end2) < 0) + && (end1.length == 0 || start2.length == 0 || Bytes.compareTo(start2, + end1) < 0); + } + + /** + * Set job input. + * + * @param job + * The job + * @param snapshotName + * The snapshot name + * @param tableRootDir + * The directory where the temp table will be created + * @throws IOException + * on error + */ + public static void setInput(final Job job, final String snapshotName, + final Path tableRootDir) throws IOException { + Configuration conf = job.getConfiguration(); + conf.set(SNAPSHOT_NAME_KEY, snapshotName); + + Path rootDir = new Path(conf.get(HConstants.HBASE_DIR)); + FileSystem fs = rootDir.getFileSystem(conf); + + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir( + snapshotName, rootDir); + SnapshotDescription snapshotDesc = SnapshotDescriptionUtils + .readSnapshotInfo(fs, snapshotDir); + + // load table descriptor + HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, + snapshotDir); + + Path tableDir = new Path(tableRootDir, htd.getNameAsString()); + conf.set(TABLE_DIR_KEY, tableDir.toString()); + + MonitoredTask status = TaskMonitor.get().createStatus( + "Restoring snapshot '" + snapshotName + "' to directory " + tableDir); + ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(); + + RestoreSnapshotHelper helper = new RestoreSnapshotHelper(conf, fs, + snapshotDesc, snapshotDir, htd, tableDir, monitor, status); + helper.restoreHdfsRegions(); + } + + private static String getSnapshotName(final Configuration conf) { + String snapshotName = conf.get(SNAPSHOT_NAME_KEY); + if (snapshotName == null) { + throw new IllegalArgumentException("Snapshot name must be provided"); + } + return snapshotName; + } +} +