diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java new file mode 100644 index 0000000..9f094e2 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java @@ -0,0 +1,99 @@ +package org.apache.hadoop.hbase.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl; +import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.mapred.*; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +/** + * MultiTableSnapshotInputFormat generalizes {@link org.apache.hadoop.hbase.mapred.TableSnapshotInputFormat} + * allowing a MapReduce job to run over one or more table snapshots, with one or more scans configured for each. + * Internally, the input format delegates to {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} + * and thus has the same performance advantages; see {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for + * more details. + * + + *

+ * Usage is similar to TableSnapshotInputFormat, with the following exception: initMultiTableSnapshotMapperJob takes in a map + * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding scan will be applied; + * the overall dataset for the job is defined by the concatenation of the regions and tables included in each snapshot/scan + * pair. + * + * {@link org.apache.hadoop.hbase.mapred.TableMapReduceUtil#initMultiTableSnapshotMapperJob(Map, Class, Class, Class, JobConf, boolean, Path)} + * can be used to configure the job. + *

{@code
+ * Job job = new Job(conf);
+ * Map> snapshotScans = ImmutableMap.of(
+ *    "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
+ *    "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
+ * );
+ * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
+ * TableMapReduceUtil.initTableSnapshotMapperJob(
+ *     snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
+ *      MyMapOutputValueWritable.class, job, true, restoreDir);
+ * }
+ * 
+ *

+ * Internally, this input format restores each snapshot into a subdirectory of the given tmp directory. Input splits and + * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} + * (one per region). + *

+ * + * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on permissioning; the + * same caveats apply here. + * + * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat + * @see org.apache.hadoop.hbase.client.TableSnapshotScanner + */ +public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat implements InputFormat { + + private final MultiTableSnapshotInputFormatImpl delegate; + + public MultiTableSnapshotInputFormat() { + this.delegate = new MultiTableSnapshotInputFormatImpl(); + } + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + List splits = + delegate.getSplits(job); + InputSplit[] results = new InputSplit[splits.size()]; + for (int i = 0; i < splits.size(); i++) { + results[i] = new TableSnapshotRegionSplit(splits.get(i)); + } + return results; + } + + @Override + public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) + throws IOException { + return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job); + } + + + /** + * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of restoreDir. + * + * Sets: {@link org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl#RESTORE_DIRS_KEY}, + * {@link org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl#SNAPSHOT_TO_SCANS_KEY} + * @param conf + * @param snapshotScans + * @param restoreDir + * @throws IOException + */ + public static void setInput(Configuration conf, Map> snapshotScans, Path restoreDir) throws IOException { + new MultiTableSnapshotInputFormatImpl().setInput(conf, snapshotScans, restoreDir); + } + +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java index b5fefbb..5a8b672 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java @@ -19,8 +19,9 @@ package org.apache.hadoop.hbase.mapred; import java.io.IOException; +import java.util.Collection; +import java.util.Map; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.fs.Path; @@ -30,25 +31,18 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.MutationSerialization; -import org.apache.hadoop.hbase.mapreduce.ResultSerialization; -import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; -import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; +import org.apache.hadoop.hbase.mapreduce.*; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.TokenUtil; -import org.apache.hadoop.hbase.zookeeper.ZKClusterId; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; -import org.apache.hadoop.security.token.Token; -import org.apache.zookeeper.KeeperException; /** * Utility for {@link TableMap} and {@link TableReduce} @@ -128,6 +122,44 @@ public class TableMapReduceUtil { } /** + * Sets up the job for reading from one or more multiple table snapshots, with one or more scan per snapshot. + * It bypasses hbase servers and read directly from snapshot files. + * + * @param snapshotScans map of snapshot name to scans on that snapshot. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + */ + public static void initMultiTableSnapshotMapperJob(Map> snapshotScans, + Class mapper, + Class outputKeyClass, + Class outputValueClass, JobConf job, + boolean addDependencyJars, Path tmpRestoreDir + ) throws IOException { + MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir); + + + job.setInputFormat(MultiTableSnapshotInputFormat.class); + if (outputValueClass != null) { + job.setMapOutputValueClass(outputValueClass); + } + if (outputKeyClass != null) { + job.setMapOutputKeyClass(outputKeyClass); + } + job.setMapperClass(mapper); + if (addDependencyJars) { + addDependencyJars(job); + } + + org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job); + } + + + /** * Sets up the job for reading from a table snapshot. It bypasses hbase servers * and read directly from snapshot files. * diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java index 1c5e4bd..502ee4a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl; import org.apache.hadoop.mapred.InputFormat; @@ -60,8 +61,8 @@ public class TableSnapshotInputFormat implements InputFormat locations) { - this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations); + List locations, Scan scan, Path restoreDir) { + this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir); } @Override diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java new file mode 100644 index 0000000..b761eae --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java @@ -0,0 +1,80 @@ +package org.apache.hadoop.hbase.mapreduce; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapreduce.*; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +/** + * MultiTableSnapshotInputFormat generalizes {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} + * allowing a MapReduce job to run over one or more table snapshots, with one or more scans configured for each. + * Internally, the input format delegates to {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} + * and thus has the same performance advantages; see {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for + * more details. + * + + *

+ * Usage is similar to TableSnapshotInputFormat, with the following exception: initMultiTableSnapshotMapperJob takes in a map + * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding scan will be applied; + * the overall dataset for the job is defined by the concatenation of the regions and tables included in each snapshot/scan + * pair. + * + * {@link org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#initMultiTableSnapshotMapperJob(java.util.Map, Class, Class, Class, org.apache.hadoop.mapreduce.Job, boolean, org.apache.hadoop.fs.Path)} + * can be used to configure the job. + *

{@code
+ * Job job = new Job(conf);
+ * Map> snapshotScans = ImmutableMap.of(
+ *    "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
+ *    "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
+ * );
+ * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
+ * TableMapReduceUtil.initTableSnapshotMapperJob(
+ *     snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
+ *      MyMapOutputValueWritable.class, job, true, restoreDir);
+ * }
+ * 
+ *

+ * Internally, this input format restores each snapshot into a subdirectory of the given tmp directory. Input splits and + * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} + * (one per region). + *

+ * + * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on permissioning; the + * same caveats apply here. + * + * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat + * @see org.apache.hadoop.hbase.client.TableSnapshotScanner + */ +public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat { + + private final MultiTableSnapshotInputFormatImpl delegate; + + public MultiTableSnapshotInputFormat() { + this.delegate = new MultiTableSnapshotInputFormatImpl(); + } + + @Override + public List getSplits(JobContext jobContext) throws IOException, InterruptedException { + List splits = delegate.getSplits(jobContext.getConfiguration()); + List rtn = Lists.newArrayListWithCapacity(splits.size()); + + for (TableSnapshotInputFormatImpl.InputSplit split : splits) { + rtn.add(new TableSnapshotInputFormat.TableSnapshotRegionSplit(split)); + } + + return rtn; + } + + + public static void setInput(Configuration configuration, Map> snapshotScans, Path tmpRestoreDir) throws IOException { + new MultiTableSnapshotInputFormatImpl().setInput(configuration, snapshotScans, tmpRestoreDir); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java new file mode 100644 index 0000000..171294a --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java @@ -0,0 +1,247 @@ +package org.apache.hadoop.hbase.mapreduce; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.util.StringUtils; + +import java.io.IOException; +import java.util.*; + +/** + * Shared implementation of mapreduce code over multiple table snapshots. + * + * Utilized by both mapreduce ({@link org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormat} and mapred + * ({@link org.apache.hadoop.hbase.mapred.MultiTableSnapshotInputFormat} implementations. + */ +public class MultiTableSnapshotInputFormatImpl { + + private static final Log LOG = LogFactory.getLog(MultiTableSnapshotInputFormat.class); + + // TODO: hopefully this is a good delimiter; it's not in the base64 alphabet, nor is it valid for paths + public static final char KVP_DELIMITER = '^'; + + public static final String RESTORE_DIRS_KEY = "hbase.MultiTableSnapshotInputFormat.restore.snapshotDirMapping"; + public static final String SNAPSHOT_TO_SCANS_KEY = "hbase.MultiTableSnapshotInputFormat.snapshotsToScans"; + + /** + * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of restoreDir. + * + * Sets: {@link #RESTORE_DIRS_KEY}, {@link #SNAPSHOT_TO_SCANS_KEY} + * @param conf + * @param snapshotScans + * @param restoreDir + * @throws IOException + */ + public void setInput(Configuration conf, Map> snapshotScans, Path restoreDir) throws IOException { + Path rootDir = FSUtils.getRootDir(conf); + FileSystem fs = rootDir.getFileSystem(conf); + + setSnapshotToScans(conf, snapshotScans); + Map restoreDirs = generateSnapshotToRestoreDir(snapshotScans.keySet(), restoreDir); + setSnapshotDirs(conf, restoreDirs); + restoreSnapshots(conf, restoreDirs, fs); + } + + /** + * Return the list of splits extracted from the scans/snapshots pushed to conf by {@link #setInput(org.apache.hadoop.conf.Configuration, java.util.Map, org.apache.hadoop.fs.Path)} + * @param conf + * @return + * @throws IOException + */ + public List getSplits(Configuration conf) throws IOException { + Path rootDir = FSUtils.getRootDir(conf); + FileSystem fs = rootDir.getFileSystem(conf); + + List rtn = Lists.newArrayList(); + + Map> snapshotsToScans = getSnapshotsToScans(conf); + Map snapshotsToRestoreDirs = getSnapshotDirs(conf); + for (Map.Entry> entry : snapshotsToScans.entrySet()) { + String snapshotName = entry.getKey(); + + + Path restoreDir = snapshotsToRestoreDirs.get(snapshotName); + + SnapshotManifest manifest = TableSnapshotInputFormatImpl.getSnapshotManifest(conf, snapshotName, rootDir, fs); + List regionInfos = TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest); + + for (Scan scan : entry.getValue()) { + List splits = TableSnapshotInputFormatImpl.getSplits( + scan, + manifest, + regionInfos, + restoreDir, + conf); + for (TableSnapshotInputFormatImpl.InputSplit split : splits) { + rtn.add(split); + } + } + } + return rtn; + } + + /** + * Retrieve the snapshot name -> list mapping pushed to configuration by {@link #setSnapshotToScans(org.apache.hadoop.conf.Configuration, java.util.Map)} + * @param conf + * @return + * @throws IOException + */ + public Map> getSnapshotsToScans(Configuration conf) throws IOException { + + Map> rtn = Maps.newHashMap(); + + for (Map.Entry entry : getKeyValues(conf, SNAPSHOT_TO_SCANS_KEY)) { + String snapshotName = entry.getKey(); + String scan = entry.getValue(); + + Collection snapshotScans = rtn.get(snapshotName); + if (snapshotScans == null) { + snapshotScans = Lists.newArrayList(); + rtn.put(snapshotName, snapshotScans); + } + + snapshotScans.add(TableMapReduceUtil.convertStringToScan(scan)); + } + + return rtn; + } + + /** + * Push snapshotScans to conf (under the key {@link #SNAPSHOT_TO_SCANS_KEY}) + * @param conf + * @param snapshotScans + * @throws IOException + */ + public void setSnapshotToScans(Configuration conf, Map> snapshotScans) throws IOException { + // flatten out snapshotScans for serialization to the job conf + List> snapshotToSerializedScans = Lists.newArrayList(); + + for (Map.Entry> entry : snapshotScans.entrySet()) { + String snapshotName = entry.getKey(); + Collection scans = entry.getValue(); + + // serialize all scans and map them to the appropriate snapshot + for (Scan scan : scans) { + snapshotToSerializedScans.add(new AbstractMap.SimpleImmutableEntry<>( + snapshotName, TableMapReduceUtil.convertScanToString(scan))); + } + } + + setKeyValues(conf, SNAPSHOT_TO_SCANS_KEY, snapshotToSerializedScans); + } + + /** + * Retrieve the directories into which snapshots have been restored from ({@link #RESTORE_DIRS_KEY}) + * @param conf + * @return + * @throws IOException + */ + public Map getSnapshotDirs(Configuration conf) throws IOException { + List> kvps = getKeyValues(conf, RESTORE_DIRS_KEY); + Map rtn = Maps.newHashMapWithExpectedSize(kvps.size()); + + for (Map.Entry kvp : kvps) { + rtn.put(kvp.getKey(), new Path(kvp.getValue())); + } + + return rtn; + } + + public void setSnapshotDirs(Configuration conf, Map snapshotDirs) { + Map toSet = Maps.newHashMap(); + + for (Map.Entry entry : snapshotDirs.entrySet()) { + toSet.put(entry.getKey(), entry.getValue().toString()); + } + + setKeyValues(conf, RESTORE_DIRS_KEY, toSet.entrySet()); + } + + /** + * Generate a random path underneath baseRestoreDir for each snapshot in snapshots and return a map from the snapshot + * to the restore directory. + * @param snapshots + * @param baseRestoreDir + * @return + */ + private Map generateSnapshotToRestoreDir(Collection snapshots, Path baseRestoreDir) { + Map rtn = Maps.newHashMap(); + + for (String snapshotName : snapshots) { + Path restoreSnapshotDir = new Path(baseRestoreDir, snapshotName + "__" + UUID.randomUUID().toString()); + rtn.put(snapshotName, restoreSnapshotDir); + } + + return rtn; + } + + /** + * Restore each (snapshot name, restore directory) pair in snapshotToDir + * @param conf + * @param snapshotToDir + * @param fs + * @throws IOException + */ + public void restoreSnapshots(Configuration conf, Map snapshotToDir, FileSystem fs) throws IOException { + // TODO: restore from record readers to parallelize. + Path rootDir = FSUtils.getRootDir(conf); + + for (Map.Entry entry : snapshotToDir.entrySet()) { + String snapshotName = entry.getKey(); + Path restoreDir = entry.getValue(); + LOG.info("Restoring snapshot " + snapshotName + " into " + restoreDir + " for MultiTableSnapshotInputFormat"); + restoreSnapshot(conf, snapshotName, rootDir, restoreDir, fs); + } + } + + void restoreSnapshot(Configuration conf, String snapshotName, Path rootDir, Path restoreDir, FileSystem fs) throws IOException { + RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); + } + + + // TODO: these probably belong elsewhere/may already be implemented elsewhere. + + /** + * Store a collection of Map.Entry's in conf, with each entry separated by ',' and key values delimited by ':' + * @param conf + * @param key + * @param keyValues + */ + private static void setKeyValues(Configuration conf, String key, Collection> keyValues) { + List serializedKvps = Lists.newArrayList(); + + for (Map.Entry kvp : keyValues) { + serializedKvps.add(kvp.getKey() + KVP_DELIMITER + kvp.getValue()); + } + + conf.setStrings(key, serializedKvps.toArray(new String[serializedKvps.size()])); + } + + private static List> getKeyValues(Configuration conf, String key) { + String[] kvps = conf.getStrings(key); + + List> rtn = Lists.newArrayList(); + + for (String kvp : kvps) { + String[] splitKvp = StringUtils.split(kvp, KVP_DELIMITER); + + if (splitKvp.length != 2) { + throw new IllegalArgumentException("Expected key value pair for configuration key '" + key + "'" + + " to be of form '" + KVP_DELIMITER + "; was " + kvp + " instead"); + } + + rtn.add(new AbstractMap.SimpleImmutableEntry<>(splitKvp[0], splitKvp[1])); + } + return rtn; + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index 149752b..e358453 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -22,13 +22,7 @@ import java.io.File; import java.io.IOException; import java.net.URL; import java.net.URLDecoder; -import java.util.ArrayList; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.zip.ZipEntry; import java.util.zip.ZipFile; @@ -307,6 +301,46 @@ public class TableMapReduceUtil { } /** + * Sets up the job for reading from one or more multiple table snapshots, with one or more scan per snapshot. + * It bypasses hbase servers and read directly from snapshot files. + * + * @param snapshotScans map of snapshot name to scans on that snapshot. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + */ + public static void initMultiTableSnapshotMapperJob(Map> snapshotScans, + Class mapper, + Class outputKeyClass, + Class outputValueClass, Job job, + boolean addDependencyJars, Path tmpRestoreDir + ) throws IOException { + MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir); + + job.setInputFormatClass(MultiTableSnapshotInputFormat.class); + if (outputValueClass != null) { + job.setMapOutputValueClass(outputValueClass); + } + if (outputKeyClass != null) { + job.setMapOutputKeyClass(outputKeyClass); + } + job.setMapperClass(mapper); + Configuration conf = job.getConfiguration(); + HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); + + if (addDependencyJars) { + addDependencyJars(job); + addDependencyJars(job.getConfiguration(), MetricsRegistry.class); + } + + resetCacheConfig(job.getConfiguration()); + } + + /** * Sets up the job for reading from a table snapshot. It bypasses hbase servers * and read directly from snapshot files. * diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java index 44d88c5..de0f69b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.Writable; @@ -98,8 +99,8 @@ public class TableSnapshotInputFormat extends InputFormat locations) { - this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations); + List locations, Scan scan, Path restoreDir) { + this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir); } @Override diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java index 8496868..a6b3306 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hbase.mapreduce; +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -35,11 +38,13 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit; +import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos; import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.io.Writable; @@ -61,9 +66,12 @@ public class TableSnapshotInputFormatImpl { // TODO: Snapshots files are owned in fs by the hbase user. There is no // easy way to delegate access. + public static final Log LOG = LogFactory.getLog(TableSnapshotInputFormatImpl.class); + + private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name"; // key for specifying the root dir of the restored snapshot - private static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir"; + protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir"; /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */ private static final String LOCALITY_CUTOFF_MULTIPLIER = @@ -74,14 +82,17 @@ public class TableSnapshotInputFormatImpl { * Implementation class for InputSplit logic common between mapred and mapreduce. */ public static class InputSplit implements Writable { + private HTableDescriptor htd; private HRegionInfo regionInfo; private String[] locations; + private String scan; + private String restoreDir; // constructor for mapreduce framework / Writable public InputSplit() {} - public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List locations) { + public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List locations, Scan scan, Path restoreDir) { this.htd = htd; this.regionInfo = regionInfo; if (locations == null || locations.isEmpty()) { @@ -89,6 +100,25 @@ public class TableSnapshotInputFormatImpl { } else { this.locations = locations.toArray(new String[locations.size()]); } + try { + this.scan = scan != null ? TableMapReduceUtil.convertScanToString(scan) : ""; + } catch (IOException e) { + LOG.warn("Failed to convert Scan to String", e); + } + + this.restoreDir = restoreDir.toString(); + } + + public HTableDescriptor getHtd() { + return htd; + } + + public String getScan() { + return scan; + } + + public String getRestoreDir() { + return restoreDir; } public long getLength() { @@ -128,6 +158,10 @@ public class TableSnapshotInputFormatImpl { byte[] buf = baos.toByteArray(); out.writeInt(buf.length); out.write(buf); + + Bytes.writeByteArray(out, Bytes.toBytes(scan)); + Bytes.writeByteArray(out, Bytes.toBytes(restoreDir)); + } @Override @@ -140,6 +174,9 @@ public class TableSnapshotInputFormatImpl { this.regionInfo = HRegionInfo.convert(split.getRegion()); List locationsList = split.getLocationsList(); this.locations = locationsList.toArray(new String[locationsList.size()]); + + this.scan = Bytes.toString(Bytes.readByteArray(in)); + this.restoreDir = Bytes.toString(Bytes.readByteArray(in)); } } @@ -158,28 +195,12 @@ public class TableSnapshotInputFormatImpl { } public void initialize(InputSplit split, Configuration conf) throws IOException { + this.scan = TableMapReduceUtil.convertStringToScan(split.getScan()); this.split = split; HTableDescriptor htd = split.htd; HRegionInfo hri = this.split.getRegionInfo(); FileSystem fs = FSUtils.getCurrentFileSystem(conf); - Path tmpRootDir = new Path(conf.get(RESTORE_DIR_KEY)); // This is the user specified root - // directory where snapshot was restored - - // create scan - // TODO: mapred does not support scan as input API. Work around for now. - if (conf.get(TableInputFormat.SCAN) != null) { - scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN)); - } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) { - String[] columns = - conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" "); - scan = new Scan(); - for (String col : columns) { - scan.addFamily(Bytes.toBytes(col)); - } - } else { - throw new IllegalArgumentException("A Scan is not configured for this job"); - } // region is immutable, this should be fine, // otherwise we have to set the thread read point @@ -187,7 +208,7 @@ public class TableSnapshotInputFormatImpl { // disable caching of data blocks scan.setCacheBlocks(false); - scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null); + scanner = new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null); } public boolean nextKeyValue() throws IOException { @@ -233,18 +254,39 @@ public class TableSnapshotInputFormatImpl { Path rootDir = FSUtils.getRootDir(conf); FileSystem fs = rootDir.getFileSystem(conf); - Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); - SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); - SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc); + SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs); + + List regionInfos = getRegionInfosFromManifest(manifest); + + // TODO: mapred does not support scan as input API. Work around for now. + Scan scan = extractScanFromConf(conf); + // the temp dir where the snapshot is restored + Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY)); + + return getSplits(scan, manifest, regionInfos, restoreDir, conf); + } + + public static List getRegionInfosFromManifest(SnapshotManifest manifest) { List regionManifests = manifest.getRegionManifests(); if (regionManifests == null) { throw new IllegalArgumentException("Snapshot seems empty"); } - // load table descriptor - HTableDescriptor htd = manifest.getTableDescriptor(); + List regionInfos = Lists.newArrayListWithCapacity(regionManifests.size()); - // TODO: mapred does not support scan as input API. Work around for now. + for (SnapshotRegionManifest regionManifest : regionManifests) { + regionInfos.add(HRegionInfo.convert(regionManifest.getRegionInfo())); + } + return regionInfos; + } + + public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName, Path rootDir, FileSystem fs) throws IOException { + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); + SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); + return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc); + } + + public static Scan extractScanFromConf(Configuration conf) throws IOException { Scan scan = null; if (conf.get(TableInputFormat.SCAN) != null) { scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN)); @@ -258,29 +300,39 @@ public class TableSnapshotInputFormatImpl { } else { throw new IllegalArgumentException("Unable to create scan"); } - // the temp dir where the snapshot is restored - Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY)); + return scan; + } + + public static List getSplits(Scan scan, + SnapshotManifest manifest, + List regionManifests, + Path restoreDir, + Configuration conf + ) throws IOException { + // load table descriptor + HTableDescriptor htd = manifest.getTableDescriptor(); + Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName()); List splits = new ArrayList(); - for (SnapshotRegionManifest regionManifest : regionManifests) { + for (HRegionInfo hri : regionManifests) { // load region descriptor - HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo()); if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), - hri.getStartKey(), hri.getEndKey())) { + hri.getStartKey(), hri.getEndKey())) { // compute HDFS locations from snapshot files (which will get the locations for // referred hfiles) List hosts = getBestLocations(conf, - HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); + HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); int len = Math.min(3, hosts.size()); hosts = hosts.subList(0, len); - splits.add(new InputSplit(htd, hri, hosts)); + splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir)); } } return splits; + } /** diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java new file mode 100644 index 0000000..f526509 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java @@ -0,0 +1,107 @@ +package org.apache.hadoop.hbase.mapred; + +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.*; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +@Category({VerySlowMapReduceTests.class, LargeTests.class}) +public class TestMultiTableSnapshotInputFormat extends org.apache.hadoop.hbase.mapreduce.TestMultiTableSnapshotInputFormat { + + private static final Log LOG = LogFactory.getLog(TestMultiTableSnapshotInputFormat.class); + + @Override + protected void runJob(String jobName, Configuration c, List scans) throws IOException, InterruptedException, ClassNotFoundException { + JobConf job = new JobConf(TEST_UTIL.getConfiguration()); + + job.setJobName(jobName); + job.setMapperClass(Mapper.class); + job.setReducerClass(Reducer.class); + + TableMapReduceUtil.initMultiTableSnapshotMapperJob( + getSnapshotScanMapping(scans), Mapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, + true, restoreDir + ); + + TableMapReduceUtil.addDependencyJars(job); + + job.setReducerClass(Reducer.class); + job.setNumReduceTasks(1); // one to get final "first" and "last" key + FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); + LOG.info("Started " + job.getJobName()); + + RunningJob runningJob = JobClient.runJob(job); + runningJob.waitForCompletion(); + assertTrue(runningJob.isSuccessful()); + LOG.info("After map/reduce completion - job " + jobName); + } + + public static class Mapper extends TestMultiTableSnapshotInputFormat.ScanMapper implements TableMap { + + @Override + public void map(ImmutableBytesWritable key, Result value, OutputCollector outputCollector, Reporter reporter) throws IOException { + makeAssertions(key, value); + outputCollector.collect(key, key); + } + + /** + * Closes this stream and releases any system resources associated + * with it. If the stream is already closed then invoking this + * method has no effect. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + } + + @Override + public void configure(JobConf jobConf) { + + } + } + + public static class Reducer extends TestMultiTableSnapshotInputFormat.ScanReducer implements org.apache.hadoop.mapred.Reducer { + + private JobConf jobConf; + + @Override + public void reduce(ImmutableBytesWritable key, Iterator values, OutputCollector outputCollector, Reporter reporter) throws IOException { + makeAssertions(key, Lists.newArrayList(values)); + } + + /** + * Closes this stream and releases any system resources associated + * with it. If the stream is already closed then invoking this + * method has no effect. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + super.cleanup(this.jobConf); + } + + @Override + public void configure(JobConf jobConf) { + this.jobConf = jobConf; + } + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java new file mode 100644 index 0000000..54406b6 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java @@ -0,0 +1,252 @@ +package org.apache.hadoop.hbase.mapreduce; + +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Base set of tests and setup for input formats touching multiple tables. + */ +public abstract class MultiTableInputFormatTestBase { + static final Log LOG = LogFactory.getLog(TestMultiTableInputFormat.class); + public static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + static final String TABLE_NAME = "scantest"; + static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); + static final String KEY_STARTROW = "startRow"; + static final String KEY_LASTROW = "stpRow"; + + static List TABLES = Lists.newArrayList(); + + static { + for (int i = 0; i < 3; i++) { + TABLES.add(TABLE_NAME + String.valueOf(i)); + } + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // switch TIF to log at DEBUG level + TEST_UTIL.enableDebug(MultiTableInputFormatBase.class); + // start mini hbase cluster + TEST_UTIL.startMiniCluster(3); + // create and fill table + for (String tableName : TABLES) { + try (HTable table = + TEST_UTIL.createMultiRegionTable(TableName.valueOf(tableName), + INPUT_FAMILY, 4)) { + TEST_UTIL.loadTable(table, INPUT_FAMILY, false); + } + } + // start MR cluster + TEST_UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniMapReduceCluster(); + TEST_UTIL.shutdownMiniCluster(); + } + + @After + public void tearDown() throws Exception { + Configuration c = TEST_UTIL.getConfiguration(); + FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir"))); + } + + @Test + public void testScanEmptyToEmpty() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, null, null); + } + + @Test + public void testScanEmptyToAPP() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, "app", "apo"); + } + + @Test + public void testScanOBBToOPP() throws IOException, InterruptedException, + ClassNotFoundException { + testScan("obb", "opp", "opo"); + } + + @Test + public void testScanYZYToEmpty() throws IOException, InterruptedException, + ClassNotFoundException { + testScan("yzy", null, "zzz"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws java.io.IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + private void testScan(String start, String stop, String last) + throws IOException, InterruptedException, ClassNotFoundException { + String jobName = + "Scan" + (start != null ? start.toUpperCase() : "Empty") + "To" + + (stop != null ? stop.toUpperCase() : "Empty"); + LOG.info("Before map/reduce startup - job " + jobName); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + c.set(KEY_STARTROW, start != null ? start : ""); + c.set(KEY_LASTROW, last != null ? last : ""); + + List scans = new ArrayList(); + + for(String tableName : TABLES){ + Scan scan = new Scan(); + + scan.addFamily(INPUT_FAMILY); + scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName)); + + if (start != null) { + scan.setStartRow(Bytes.toBytes(start)); + } + if (stop != null) { + scan.setStopRow(Bytes.toBytes(stop)); + } + + scans.add(scan); + + LOG.info("scan before: " + scan); + } + + runJob(jobName, c, scans); + } + + protected void runJob(String jobName, Configuration c, List scans) throws IOException, InterruptedException, ClassNotFoundException { + Job job = new Job(c, jobName); + + initJob(scans, job); + job.setReducerClass(ScanReducer.class); + job.setNumReduceTasks(1); // one to get final "first" and "last" key + FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); + LOG.info("Started " + job.getJobName()); + job.waitForCompletion(true); + assertTrue(job.isSuccessful()); + LOG.info("After map/reduce completion - job " + jobName); + } + + protected abstract void initJob(List scans, Job job) throws IOException; + + /** + * Pass the key and value to reducer. + */ + public static class ScanMapper extends + TableMapper { + /** + * Pass the key and value to reduce. + * + * @param key The key, here "aaa", "aab" etc. + * @param value The value is the same as the key. + * @param context The task context. + * @throws java.io.IOException When reading the rows fails. + */ + @Override + public void map(ImmutableBytesWritable key, Result value, Context context) + throws IOException, InterruptedException { + makeAssertions(key, value); + context.write(key, key); + } + + public void makeAssertions(ImmutableBytesWritable key, Result value) throws IOException { + if (value.size() != 1) { + throw new IOException("There should only be one input column"); + } + Map>> cf = + value.getMap(); + if (!cf.containsKey(INPUT_FAMILY)) { + throw new IOException("Wrong input columns. Missing: '" + + Bytes.toString(INPUT_FAMILY) + "'."); + } + String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null)); + LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) + + ", value -> " + val); + } + } + + /** + * Checks the last and first keys seen against the scanner boundaries. + */ + public static class ScanReducer + extends + Reducer { + private String first = null; + private String last = null; + + @Override + protected void reduce(ImmutableBytesWritable key, + Iterable values, Context context) + throws IOException, InterruptedException { + makeAssertions(key, values); + } + + protected void makeAssertions(ImmutableBytesWritable key, Iterable values) { + int count = 0; + for (ImmutableBytesWritable value : values) { + String val = Bytes.toStringBinary(value.get()); + LOG.debug("reduce: key[" + count + "] -> " + + Bytes.toStringBinary(key.get()) + ", value -> " + val); + if (first == null) first = val; + last = val; + count++; + } + assertEquals(3, count); + } + + @Override + protected void cleanup(Context context) throws IOException, + InterruptedException { + Configuration c = context.getConfiguration(); + cleanup(c); + } + + protected void cleanup(Configuration c) { + String startRow = c.get(KEY_STARTROW); + String lastRow = c.get(KEY_LASTROW); + LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + + startRow + "\""); + LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + + "\""); + if (startRow != null && startRow.length() > 0) { + assertEquals(startRow, first); + } + if (lastRow != null && lastRow.length() > 0) { + assertEquals(lastRow, last); + } + } + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java index 3226cc6..d0c9d99 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java @@ -57,196 +57,16 @@ import org.junit.experimental.categories.Category; * too. */ @Category({VerySlowMapReduceTests.class, LargeTests.class}) -public class TestMultiTableInputFormat { - - static final Log LOG = LogFactory.getLog(TestMultiTableInputFormat.class); - static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - static final String TABLE_NAME = "scantest"; - static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); - static final String KEY_STARTROW = "startRow"; - static final String KEY_LASTROW = "stpRow"; +public class TestMultiTableInputFormat extends MultiTableInputFormatTestBase { @BeforeClass - public static void setUpBeforeClass() throws Exception { - // switch TIF to log at DEBUG level + public static void setupLogging() { TEST_UTIL.enableDebug(MultiTableInputFormat.class); - TEST_UTIL.enableDebug(MultiTableInputFormatBase.class); - // start mini hbase cluster - TEST_UTIL.startMiniCluster(3); - // create and fill table - for (int i = 0; i < 3; i++) { - try (HTable table = - TEST_UTIL.createMultiRegionTable(TableName.valueOf(TABLE_NAME + String.valueOf(i)), - INPUT_FAMILY, 4)) { - TEST_UTIL.loadTable(table, INPUT_FAMILY, false); - } - } - // start MR cluster - TEST_UTIL.startMiniMapReduceCluster(); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniMapReduceCluster(); - TEST_UTIL.shutdownMiniCluster(); - } - - @After - public void tearDown() throws Exception { - Configuration c = TEST_UTIL.getConfiguration(); - FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir"))); } - /** - * Pass the key and value to reducer. - */ - public static class ScanMapper extends - TableMapper { - /** - * Pass the key and value to reduce. - * - * @param key The key, here "aaa", "aab" etc. - * @param value The value is the same as the key. - * @param context The task context. - * @throws IOException When reading the rows fails. - */ - @Override - public void map(ImmutableBytesWritable key, Result value, Context context) - throws IOException, InterruptedException { - if (value.size() != 1) { - throw new IOException("There should only be one input column"); - } - Map>> cf = - value.getMap(); - if (!cf.containsKey(INPUT_FAMILY)) { - throw new IOException("Wrong input columns. Missing: '" + - Bytes.toString(INPUT_FAMILY) + "'."); - } - String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null)); - LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) + - ", value -> " + val); - context.write(key, key); - } - } - - /** - * Checks the last and first keys seen against the scanner boundaries. - */ - public static class ScanReducer - extends - Reducer { - private String first = null; - private String last = null; - - @Override - protected void reduce(ImmutableBytesWritable key, - Iterable values, Context context) - throws IOException, InterruptedException { - int count = 0; - for (ImmutableBytesWritable value : values) { - String val = Bytes.toStringBinary(value.get()); - LOG.debug("reduce: key[" + count + "] -> " + - Bytes.toStringBinary(key.get()) + ", value -> " + val); - if (first == null) first = val; - last = val; - count++; - } - assertEquals(3, count); - } - - @Override - protected void cleanup(Context context) throws IOException, - InterruptedException { - Configuration c = context.getConfiguration(); - String startRow = c.get(KEY_STARTROW); - String lastRow = c.get(KEY_LASTROW); - LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + - startRow + "\""); - LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + - "\""); - if (startRow != null && startRow.length() > 0) { - assertEquals(startRow, first); - } - if (lastRow != null && lastRow.length() > 0) { - assertEquals(lastRow, last); - } - } - } - - @Test - public void testScanEmptyToEmpty() throws IOException, InterruptedException, - ClassNotFoundException { - testScan(null, null, null); - } - - @Test - public void testScanEmptyToAPP() throws IOException, InterruptedException, - ClassNotFoundException { - testScan(null, "app", "apo"); - } - - @Test - public void testScanOBBToOPP() throws IOException, InterruptedException, - ClassNotFoundException { - testScan("obb", "opp", "opo"); - } - - @Test - public void testScanYZYToEmpty() throws IOException, InterruptedException, - ClassNotFoundException { - testScan("yzy", null, "zzz"); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - private void testScan(String start, String stop, String last) - throws IOException, InterruptedException, ClassNotFoundException { - String jobName = - "Scan" + (start != null ? start.toUpperCase() : "Empty") + "To" + - (stop != null ? stop.toUpperCase() : "Empty"); - LOG.info("Before map/reduce startup - job " + jobName); - Configuration c = new Configuration(TEST_UTIL.getConfiguration()); - - c.set(KEY_STARTROW, start != null ? start : ""); - c.set(KEY_LASTROW, last != null ? last : ""); - - List scans = new ArrayList(); - - for(int i=0; i<3; i++){ - Scan scan = new Scan(); - - scan.addFamily(INPUT_FAMILY); - scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(TABLE_NAME + i)); - - if (start != null) { - scan.setStartRow(Bytes.toBytes(start)); - } - if (stop != null) { - scan.setStopRow(Bytes.toBytes(stop)); - } - - scans.add(scan); - - LOG.info("scan before: " + scan); - } - - Job job = new Job(c, jobName); - + @Override + protected void initJob(List scans, Job job) throws IOException { TableMapReduceUtil.initTableMapperJob(scans, ScanMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); - job.setReducerClass(ScanReducer.class); - job.setNumReduceTasks(1); // one to get final "first" and "last" key - FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); - LOG.info("Started " + job.getJobName()); - job.waitForCompletion(true); - assertTrue(job.isSuccessful()); - LOG.info("After map/reduce completion - job " + jobName); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java new file mode 100644 index 0000000..924e53a --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java @@ -0,0 +1,79 @@ +package org.apache.hadoop.hbase.mapreduce; + +import com.google.common.base.Function; +import com.google.common.collect.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.mapreduce.Job; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +@Category({VerySlowMapReduceTests.class, LargeTests.class}) +public class TestMultiTableSnapshotInputFormat extends MultiTableInputFormatTestBase { + + protected Path restoreDir; + + @BeforeClass + public static void setUpSnapshots() throws Exception { + + TEST_UTIL.enableDebug(MultiTableSnapshotInputFormat.class); + TEST_UTIL.enableDebug(MultiTableSnapshotInputFormatImpl.class); + + // take a snapshot of every table we have. + for (String tableName : TABLES) { + SnapshotTestingUtils.createSnapshotAndValidate( + TEST_UTIL.getHBaseAdmin(), TableName.valueOf(tableName), + ImmutableList.of(MultiTableInputFormatTestBase.INPUT_FAMILY), null, + snapshotNameForTable(tableName), FSUtils.getRootDir(TEST_UTIL.getConfiguration()), TEST_UTIL.getTestFileSystem(), true); + } + } + + @Before + public void setUp() throws Exception { + this.restoreDir = new Path("/tmp"); + + } + + @Override + protected void initJob(List scans, Job job) throws IOException { + TableMapReduceUtil.initMultiTableSnapshotMapperJob( + getSnapshotScanMapping(scans), ScanMapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, + true, restoreDir + ); + } + + protected Map> getSnapshotScanMapping(final List scans) { + return Multimaps.index(scans, new Function() { + @Nullable + @Override + public String apply(Scan input) { + return snapshotNameForTable(Bytes.toStringBinary(input.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME))); + } + }).asMap(); + } + + public static String snapshotNameForTable(String tableName) { + return tableName + "_snapshot"; + } + +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java new file mode 100644 index 0000000..d7867bb --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java @@ -0,0 +1,157 @@ +package org.apache.hadoop.hbase.mapreduce; + +import com.google.common.collect.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.*; +import java.util.Objects; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.verify; + +@Category({SmallTests.class}) +public class TestMultiTableSnapshotInputFormatImpl { + + private MultiTableSnapshotInputFormatImpl subject; + private Map> snapshotScans; + private Path restoreDir; + private Configuration conf; + private Path rootDir; + + @Before + public void setUp() throws Exception { + this.subject = Mockito.spy(new MultiTableSnapshotInputFormatImpl()); + + // mock out restoreSnapshot + // TODO: this is kind of meh; it'd be much nicer to just inject the RestoreSnapshotHelper dependency into the + // input format. However, we need a new RestoreSnapshotHelper per snapshot in the current design, and it *also* + // feels weird to introduce a RestoreSnapshotHelperFactory and inject that, which would probably be the more "pure" + // way of doing things. This is the lesser of two evils, perhaps? + doNothing().when(this.subject). + restoreSnapshot(any(Configuration.class), any(String.class), any(Path.class), any(Path.class), any(FileSystem.class)) ; + + this.conf = new Configuration(); + this.rootDir = new Path("file:///test-root-dir"); + FSUtils.setRootDir(conf, rootDir); + this.snapshotScans = ImmutableMap.>of( + "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2"))), + "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("3"), Bytes.toBytes("4")), + new Scan(Bytes.toBytes("5"), Bytes.toBytes("6"))) + ); + + this.restoreDir = new Path(FSUtils.getRootDir(conf), "restore-dir"); + + } + + public void callSetInput() throws IOException { + subject.setInput(this.conf, snapshotScans, restoreDir); + } + + public Map> toScanWithEquals(Map> snapshotScans) throws IOException { + Map> rtn = Maps.newHashMap(); + + for (Map.Entry> entry : snapshotScans.entrySet()) { + List scans = Lists.newArrayList(); + + for (Scan scan : entry.getValue()) { + scans.add(new ScanWithEquals(scan)); + } + rtn.put(entry.getKey(), scans); + } + + return rtn; + } + + public static class ScanWithEquals { + + private final String startRow; + private final String stopRow; + + /** + * Creates a new instance of this class while copying all values. + * + * @param scan The scan instance to copy from. + * @throws java.io.IOException When copying the values fails. + */ + public ScanWithEquals(Scan scan) throws IOException { + this.startRow = Bytes.toStringBinary(scan.getStartRow()); + this.stopRow = Bytes.toStringBinary(scan.getStopRow()); + } + + @Override + public boolean equals(Object obj) { + if (! (obj instanceof ScanWithEquals)) { + return false; + } + ScanWithEquals otherScan = (ScanWithEquals) obj; + return Objects.equals(this.startRow, otherScan.startRow) && Objects.equals(this.stopRow, otherScan.stopRow); + } + + @Override + public String toString() { + return com.google.common.base.Objects.toStringHelper(this) + .add("startRow", startRow) + .add("stopRow", stopRow) + .toString(); + } + } + + @Test + public void testSetInputSetsSnapshotToScans() throws Exception { + + callSetInput(); + + Map> actual = subject.getSnapshotsToScans(conf); + + // convert to scans we can use .equals on + Map> actualWithEquals = toScanWithEquals(actual); + Map> expectedWithEquals = toScanWithEquals(snapshotScans); + + assertEquals(expectedWithEquals, actualWithEquals); + } + + @Test + public void testSetInputPushesRestoreDirectories() throws Exception { + callSetInput(); + + Map restoreDirs = subject.getSnapshotDirs(conf); + + assertEquals(this.snapshotScans.keySet(), restoreDirs.keySet()); + } + + @Test + public void testSetInputCreatesRestoreDirectoriesUnderRootRestoreDir() throws Exception { + callSetInput(); + + Map restoreDirs = subject.getSnapshotDirs(conf); + + for (Path snapshotDir : restoreDirs.values()) { + assertEquals("Expected " + snapshotDir + " to be a child of " + restoreDir, restoreDir, snapshotDir.getParent()); + } + } + + @Test + public void testSetInputRestoresSnapshots() throws Exception { + callSetInput(); + + Map snapshotDirs = subject.getSnapshotDirs(conf); + + for (Map.Entry entry : snapshotDirs.entrySet()) { + verify(this.subject).restoreSnapshot(eq(this.conf), eq(entry.getKey()), eq(this.rootDir), eq(entry.getValue()), any(FileSystem.class)); + } + } +}