Index: src/test/java/org/apache/hadoop/hbase/mapreduce/TestSnapshotInputFormatScan.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestSnapshotInputFormatScan.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestSnapshotInputFormatScan.java (revision 0) @@ -0,0 +1,191 @@ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + *

+ * Tests scanning a snapshot. Tests various scan start and stop row scenarios. + * This is set in a scan and tested in a MapReduce job to see if that is handed + * over and done properly too. + *

+ */ +@Category(LargeTests.class) +public class TestSnapshotInputFormatScan { + + static final Log LOG = LogFactory.getLog(TestTableInputFormatScanBase.class); + static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + static final byte[] TABLE_NAME = Bytes.toBytes("scantest"); + static final byte[] SNAPSHOT_NAME = Bytes.toBytes("scantest_snaphot"); + static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); + static final String KEY_STARTROW = "startRow"; + static final String KEY_LASTROW = "stpRow"; + + private static HTable table = null; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // config snapshot support + TEST_UTIL.getConfiguration().setBoolean( + SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); + TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6); + TEST_UTIL.getConfiguration().setBoolean( + "hbase.master.enabletable.roundrobin", true); + + // switch TIF to log at DEBUG level + TEST_UTIL.enableDebug(SnapshotInputFormat.class); + + // start mini hbase cluster + TEST_UTIL.startMiniCluster(3); + + // create and fill table + table = TEST_UTIL.createTable(TABLE_NAME, INPUT_FAMILY); + TEST_UTIL.createMultiRegions(table, INPUT_FAMILY); + TEST_UTIL.loadTable(table, INPUT_FAMILY); + TEST_UTIL.getHBaseAdmin().snapshot(SNAPSHOT_NAME, TABLE_NAME); + + // start MR cluster + TEST_UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniMapReduceCluster(); + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToEmpty() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, null, null); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToAPP() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, "app", "apo"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToBBA() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, "bba", "baz"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToBBB() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, "bbb", "bba"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToOPP() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, "opp", "opo"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + protected void testScan(String start, String stop, String last) + throws IOException, InterruptedException, ClassNotFoundException { + String jobName = "Scan" + (start != null ? start.toUpperCase() : "Empty") + + "To" + (stop != null ? stop.toUpperCase() : "Empty"); + LOG.info("Before map/reduce startup - job " + jobName); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + Scan scan = new Scan(); + scan.addFamily(INPUT_FAMILY); + if (start != null) { + scan.setStartRow(Bytes.toBytes(start)); + } + c.set(KEY_STARTROW, start != null ? start : ""); + if (stop != null) { + scan.setStopRow(Bytes.toBytes(stop)); + } + c.set(KEY_LASTROW, last != null ? last : ""); + LOG.info("scan before: " + scan); + Job job = new Job(c, jobName); + + FileSystem fs = FileSystem.get(c); + Path tmpDir = new Path("/testtmp"); + fs.mkdirs(tmpDir); + try { + TableMapReduceUtil.initSnapshotMapperJob(Bytes.toString(SNAPSHOT_NAME), + tmpDir, scan, TestTableInputFormatScanBase.ScanMapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, + false); + job.setReducerClass(TestTableInputFormatScanBase.ScanReducer.class); + job.setNumReduceTasks(1); // one to get final "first" and "last" key + FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); + LOG.info("Started " + job.getJobName()); + assertTrue(job.waitForCompletion(true)); + LOG.info("After map/reduce completion - job " + jobName); + } finally { + fs.delete(tmpDir, true); + } + + } + +} Property changes on: src/test/java/org/apache/hadoop/hbase/mapreduce/TestSnapshotInputFormatScan.java ___________________________________________________________________ Added: svn:mime-type + text/plain Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (revision 1498571) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (working copy) @@ -283,6 +283,53 @@ } } + /** + * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly from snapshot + * files. + * + * @param snapshotName + * The name of the snapshot (of a table) to read from. + * @param tableRootDir + * The directory where the temp table will be created + * @param scan + * The scan instance with the columns, time range etc. + * @param mapper + * The mapper class to use. + * @param outputKeyClass + * The class of the output key. + * @param outputValueClass + * The class of the output value. + * @param job + * The current job to adjust. Make sure the passed job is carrying all necessary HBase configuration. + * @param addDependencyJars + * upload HBase jars and jars for any of the configured job classes via the distributed cache (tmpjars). + * @throws IOException + * When setting up the details fails. + * @see SnapshotInputFormat + */ + public static void initSnapshotMapperJob(String snapshotName, Path tableRootDir, Scan scan, + Class mapper, Class outputKeyClass, Class outputValueClass, + Job job, boolean addDependencyJars) throws IOException { + + SnapshotInputFormat.setInput(job, snapshotName, tableRootDir); + + Configuration conf = job.getConfiguration(); + + job.setInputFormatClass(SnapshotInputFormat.class); + if (outputValueClass != null) { + job.setMapOutputValueClass(outputValueClass); + } + if (outputKeyClass != null) { + job.setMapOutputKeyClass(outputKeyClass); + } + job.setMapperClass(mapper); + conf.set(TableInputFormat.SCAN, convertScanToString(scan)); + + if (addDependencyJars) { + TableMapReduceUtil.addDependencyJars(job); + } + } + public static void initCredentials(Job job) throws IOException { if (User.isHBaseSecurityEnabled(job.getConfiguration())) { try { Index: src/main/java/org/apache/hadoop/hbase/mapreduce/SnapshotInputFormat.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/SnapshotInputFormat.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/SnapshotInputFormat.java (revision 0) @@ -0,0 +1,346 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * SnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job bypasses HBase servers, and + * directly accesses the underlying files (hfile, recovered edits, hlogs, etc) directly to provide maximum performance. + * The snapshot is not required to be restored or cloned. This also allows to run the mapreduce job from an online or + * offline hbase cluster. The snapshot files can be exported by using the ExportSnapshot tool, to a pure-hdfs cluster, + * and this InputFormat can be used to run the mapreduce job directly over the snapshot files. + *

+ * Usage is similar to TableInputFormat, and TableMapReduceUtil.initSnapshotMapperJob(String, Scan, Class, Class, + * Class, Job, boolean)} can be used to configure the job. + * + *

+ * {
+ *     @code
+ *     Job job = new Job(conf);
+ *     Scan scan = new Scan();
+ *     TableMapReduceUtil.initSnapshotMapperJob(snapshotName, scan, MyTableMapper.class, MyMapKeyOutput.class,
+ *             MyMapOutputValueWritable.class, job, true);
+ * }
+ * 
+ *

+ * Internally, this input format restores the snapshot into the working directory. Similar to TableInputFormat an + * InputSplit is created per region. The region is opened for reading from each RecordReader. An internal RegionScanner + * is used to execute the Scan obtained from the user. + *

+ * The user has to have sufficient access rights in the file system to access the snapshot files, and referenced files. + */ +public final class SnapshotInputFormat extends InputFormat { + // TODO: Snapshots files are owned in fs by the hbase user. There is no + // easy way to delegate access. + + private static final String SNAPSHOT_NAME_KEY = "hbase.SnapshotInputFormat.snapshot.name"; + private static final String TABLE_DIR_KEY = "hbase.SnapshotInputFormat.table.dir"; + + /** + * Snapshot region split. + */ + public static final class SnapshotRegionSplit extends InputSplit implements Writable { + private String regionName; + private String[] locations; + + /** + * Constructor for serialization. + */ + public SnapshotRegionSplit() { + } + + /** + * Constructor. + * + * @param regionName + * Region name + * @param locationList + * List of nodes with the region's HDFS blocks, in descending order of weight + */ + public SnapshotRegionSplit(final String regionName, final List locationList) { + this.regionName = regionName; + + // only use the top 3 nodes + List list = locationList.size() > 3 ? locationList.subList(0, 3) : locationList; + this.locations = list.toArray(new String[list.size()]); + } + + @Override + public long getLength() throws IOException, InterruptedException { + return locations.length; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return locations; + } + + @Override + public void write(final DataOutput out) throws IOException { + // locations don't need to be serialized + Text.writeString(out, regionName); + } + + @Override + public void readFields(final DataInput in) throws IOException { + // locations don't need to be serialized + regionName = Text.readString(in); + } + } + + /** + * Snapshot region record reader. + */ + public static final class SnapshotRegionRecordReader extends RecordReader { + private SnapshotRegionSplit split; + private HRegion region; + private Scan scan; + private RegionScanner scanner; + private List values; + private Result result = null; + private ImmutableBytesWritable row = null; + private boolean more; + + @Override + public void initialize(final InputSplit aSplit, final TaskAttemptContext context) throws IOException, + InterruptedException { + Configuration conf = context.getConfiguration(); + this.split = (SnapshotRegionSplit) aSplit; + + Path rootDir = new Path(conf.get(HConstants.HBASE_DIR)); + FileSystem fs = rootDir.getFileSystem(conf); + + String snapshotName = getSnapshotName(conf); + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); + + // load region descriptor + String regionName = this.split.regionName; + Path regionDir = new Path(snapshotDir, regionName); + HRegionInfo hri = HRegion.loadDotRegionInfoFileContent(fs, regionDir); + + // create scan + scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN)); + scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); // region is immutable, this should be fine, + // otherwise we have to set the thread read point + + // load table descriptor + HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, snapshotDir); + Path tableDir = new Path(conf.get(TABLE_DIR_KEY)); + + // open region from the snapshot directory + this.region = openRegion(tableDir, fs, conf, hri, htd); + + // create region scanner + this.scanner = region.getScanner(scan); + values = new ArrayList(); + this.more = true; + + region.startRegionOperation(); + } + + private HRegion openRegion(final Path tableDir, final FileSystem fs, final Configuration conf, + final HRegionInfo hri, final HTableDescriptor htd) throws IOException { + HRegion r = HRegion.newHRegion(tableDir, null, fs, conf, hri, htd, null); + r.initialize(null); + return r; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + values.clear(); + // RegionScanner.next() has a different contract than RecordReader.nextKeyValue(). Scanner + // indicates no value read by returning empty results. Returns boolean indicates if more + // rows exist AFTER this one + if (!more) { + return false; + } + more = scanner.nextRaw(values, scan.getBatch(), null); + if (values == null || values.isEmpty()) { + // we are done + return false; + } + + this.result = new Result(values); + if (this.row == null) { + this.row = new ImmutableBytesWritable(); + } + this.row.set(result.getRow()); + + return true; + } + + @Override + public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { + return row; + } + + @Override + public Result getCurrentValue() throws IOException, InterruptedException { + return result; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return 0; + } + + @Override + public void close() throws IOException { + if (this.scanner != null) { + this.scanner.close(); + } + if (region != null) { + region.closeRegionOperation(); + region.close(true); + } + } + } + + @Override + public RecordReader createRecordReader(final InputSplit split, + final TaskAttemptContext context) throws IOException { + return new SnapshotRegionRecordReader(); + } + + @Override + public List getSplits(final JobContext job) throws IOException, InterruptedException { + Configuration conf = job.getConfiguration(); + String snapshotName = getSnapshotName(job.getConfiguration()); + + Path rootDir = new Path(conf.get(HConstants.HBASE_DIR)); + FileSystem fs = rootDir.getFileSystem(conf); + + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); + + Set snapshotRegionNames = SnapshotReferenceUtil.getSnapshotRegionNames(fs, snapshotDir); + if (snapshotRegionNames == null) { + throw new IllegalArgumentException("Snapshot is empty"); + } + + // load table descriptor + HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, snapshotDir); + + Scan scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN)); + + List splits = new ArrayList(snapshotRegionNames.size()); + for (String regionName : snapshotRegionNames) { + // load region descriptor + Path regionDir = new Path(snapshotDir, regionName); + HRegionInfo hri = HRegion.loadDotRegionInfoFileContent(fs, regionDir); + + if (keyRangesOverlap(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(), hri.getEndKey())) { + List hosts = + HRegion.computeHDFSBlocksDistribution(conf, htd, hri.getEncodedName()).getTopHosts(); + splits.add(new SnapshotRegionSplit(regionName, hosts)); + } + + } + + return splits; + } + + private boolean keyRangesOverlap(final byte[] start1, final byte[] end1, final byte[] start2, final byte[] end2) { + return (end2.length == 0 || start1.length == 0 || Bytes.compareTo(start1, end2) < 0) + && (end1.length == 0 || start2.length == 0 || Bytes.compareTo(start2, end1) < 0); + } + + /** + * Set job input. + * + * @param job + * The job + * @param snapshotName + * The snapshot name + * @param tableRootDir + * The directory where the temp table will be created + * @throws IOException + * on error + */ + public static void setInput(final Job job, final String snapshotName, final Path tableRootDir) throws IOException { + Configuration conf = job.getConfiguration(); + conf.set(SNAPSHOT_NAME_KEY, snapshotName); + + Path rootDir = new Path(conf.get(HConstants.HBASE_DIR)); + FileSystem fs = rootDir.getFileSystem(conf); + + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); + SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); + + // load table descriptor + HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, snapshotDir); + + Path tableDir = new Path(tableRootDir, htd.getNameAsString()); + conf.set(TABLE_DIR_KEY, tableDir.toString()); + + MonitoredTask status = + TaskMonitor.get().createStatus("Restoring snapshot '" + snapshotName + "' to directory " + tableDir); + ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(); + + RestoreSnapshotHelper helper = + new RestoreSnapshotHelper(conf, fs, snapshotDesc, snapshotDir, htd, tableDir, monitor, status); + helper.restoreHdfsRegions(); // TODO: restore from record readers to parallelize? + } + + private static String getSnapshotName(final Configuration conf) { + String snapshotName = conf.get(SNAPSHOT_NAME_KEY); + if (snapshotName == null) { + throw new IllegalArgumentException("Snapshot name must be provided"); + } + return snapshotName; + } + +} Property changes on: src/main/java/org/apache/hadoop/hbase/mapreduce/SnapshotInputFormat.java ___________________________________________________________________ Added: svn:mime-type + text/plain