Index: hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSnapshotInputFormatScan.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSnapshotInputFormatScan.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSnapshotInputFormatScan.java (revision 0) @@ -0,0 +1,196 @@ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + *
+ * Tests scanning a snapshot. Tests various scan start and stop row scenarios. + * This is set in a scan and tested in a MapReduce job to see if that is handed + * over and done properly too. + *
+ */ +@Category(LargeTests.class) +public class TestSnapshotInputFormatScan { + + static final Log LOG = LogFactory.getLog(TestTableInputFormatScanBase.class); + static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + static final byte[] TABLE_NAME = Bytes.toBytes("scantest"); + static final byte[] SNAPSHOT_NAME = Bytes.toBytes("scantest_snaphot"); + static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); + static final String KEY_STARTROW = "startRow"; + static final String KEY_LASTROW = "stpRow"; + + private static HTable table = null; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // test intermittently fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on. + // this turns it off for this test. TODO: Figure out why scr breaks recovery. + System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); + + // config snapshot support + TEST_UTIL.getConfiguration().setBoolean( + SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); + TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6); + TEST_UTIL.getConfiguration().setBoolean( + "hbase.master.enabletable.roundrobin", true); + + // switch TIF to log at DEBUG level + TEST_UTIL.enableDebug(SnapshotInputFormat.class); + + // start mini hbase cluster + TEST_UTIL.startMiniCluster(3); + + // create and fill table + table = TEST_UTIL.createTable(TABLE_NAME, INPUT_FAMILY); + TEST_UTIL.createMultiRegions(table, INPUT_FAMILY); + TEST_UTIL.loadTable(table, INPUT_FAMILY); + TEST_UTIL.getHBaseAdmin().snapshot(SNAPSHOT_NAME, TABLE_NAME); + + // start MR cluster + TEST_UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniMapReduceCluster(); + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToEmpty() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, null, null); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToAPP() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, "app", "apo"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToBBA() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, "bba", "baz"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToBBB() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, "bbb", "bba"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToOPP() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, "opp", "opo"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + protected void testScan(String start, String stop, String last) + throws IOException, InterruptedException, ClassNotFoundException { + String jobName = "Scan" + (start != null ? start.toUpperCase() : "Empty") + + "To" + (stop != null ? stop.toUpperCase() : "Empty"); + LOG.info("Before map/reduce startup - job " + jobName); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + Scan scan = new Scan(); + scan.addFamily(INPUT_FAMILY); + if (start != null) { + scan.setStartRow(Bytes.toBytes(start)); + } + c.set(KEY_STARTROW, start != null ? start : ""); + if (stop != null) { + scan.setStopRow(Bytes.toBytes(stop)); + } + c.set(KEY_LASTROW, last != null ? last : ""); + LOG.info("scan before: " + scan); + Job job = new Job(c, jobName); + + FileSystem fs = FileSystem.get(c); + Path tmpDir = new Path("/" + UUID.randomUUID()); + fs.mkdirs(tmpDir); + try { + TableMapReduceUtil.initSnapshotMapperJob(Bytes.toString(SNAPSHOT_NAME), + tmpDir, scan, TestTableInputFormatScanBase.ScanMapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, + false); + job.setReducerClass(TestTableInputFormatScanBase.ScanReducer.class); + job.setNumReduceTasks(1); // one to get final "first" and "last" key + FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); + LOG.info("Started " + job.getJobName()); + assertTrue(job.waitForCompletion(true)); + LOG.info("After map/reduce completion - job " + jobName); + } finally { + fs.delete(tmpDir, true); + } + + } + +} Property changes on: hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSnapshotInputFormatScan.java ___________________________________________________________________ Added: svn:mime-type + text/plain Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (revision 1498739) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (working copy) @@ -292,6 +292,53 @@ } } + /** + * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly from snapshot + * files. + * + * @param snapshotName + * The name of the snapshot (of a table) to read from. + * @param tableRootDir + * The directory where the temp table will be created + * @param scan + * The scan instance with the columns, time range etc. + * @param mapper + * The mapper class to use. + * @param outputKeyClass + * The class of the output key. + * @param outputValueClass + * The class of the output value. + * @param job + * The current job to adjust. Make sure the passed job is carrying all necessary HBase configuration. + * @param addDependencyJars + * upload HBase jars and jars for any of the configured job classes via the distributed cache (tmpjars). + * @throws IOException + * When setting up the details fails. + * @see SnapshotInputFormat + */ + public static void initSnapshotMapperJob(String snapshotName, Path tableRootDir, Scan scan, + Class extends TableMapper> mapper, Class> outputKeyClass, Class> outputValueClass, + Job job, boolean addDependencyJars) throws IOException { + + SnapshotInputFormat.setInput(job, snapshotName, tableRootDir); + + Configuration conf = job.getConfiguration(); + + job.setInputFormatClass(SnapshotInputFormat.class); + if (outputValueClass != null) { + job.setMapOutputValueClass(outputValueClass); + } + if (outputKeyClass != null) { + job.setMapOutputKeyClass(outputKeyClass); + } + job.setMapperClass(mapper); + conf.set(TableInputFormat.SCAN, convertScanToString(scan)); + + if (addDependencyJars) { + TableMapReduceUtil.addDependencyJars(job); + } + } + public static void initCredentials(Job job) throws IOException { if (User.isHBaseSecurityEnabled(job.getConfiguration())) { try { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SnapshotInputFormat.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SnapshotInputFormat.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SnapshotInputFormat.java (revision 0) @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * SnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job bypasses HBase servers, and + * directly accesses the underlying files (hfile, recovered edits, hlogs, etc) directly to provide maximum performance. + * The snapshot is not required to be restored or cloned. This also allows to run the mapreduce job from an online or + * offline hbase cluster. The snapshot files can be exported by using the ExportSnapshot tool, to a pure-hdfs cluster, + * and this InputFormat can be used to run the mapreduce job directly over the snapshot files. + *+ * Usage is similar to TableInputFormat, and TableMapReduceUtil.initSnapshotMapperJob(String, Scan, Class, Class, + * Class, Job, boolean)} can be used to configure the job. + * + *
+ * {
+ * @code
+ * Job job = new Job(conf);
+ * Scan scan = new Scan();
+ * TableMapReduceUtil.initSnapshotMapperJob(snapshotName, scan, MyTableMapper.class, MyMapKeyOutput.class,
+ * MyMapOutputValueWritable.class, job, true);
+ * }
+ *
+ * + * Internally, this input format restores the snapshot into the working directory. Similar to TableInputFormat an + * InputSplit is created per region. The region is opened for reading from each RecordReader. An internal RegionScanner + * is used to execute the Scan obtained from the user. + *
+ * The user has to have sufficient access rights in the file system to access the snapshot files, and referenced files.
+ */
+public final class SnapshotInputFormat extends InputFormat