diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java new file mode 100644 index 0000000..ab27edd --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl; +import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +/** + * MultiTableSnapshotInputFormat generalizes {@link org.apache.hadoop.hbase.mapred + * .TableSnapshotInputFormat} + * allowing a MapReduce job to run over one or more table snapshots, with one or more scans + * configured for each. + * Internally, the input format delegates to {@link org.apache.hadoop.hbase.mapreduce + * .TableSnapshotInputFormat} + * and thus has the same performance advantages; see {@link org.apache.hadoop.hbase.mapreduce + * .TableSnapshotInputFormat} for + * more details. + * Usage is similar to TableSnapshotInputFormat, with the following exception: + * initMultiTableSnapshotMapperJob takes in a map + * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding + * scan will be applied; + * the overall dataset for the job is defined by the concatenation of the regions and tables + * included in each snapshot/scan + * pair. + * {@link org.apache.hadoop.hbase.mapred.TableMapReduceUtil#initMultiTableSnapshotMapperJob(Map, + * Class, Class, Class, JobConf, boolean, Path)} + * can be used to configure the job. + *
{@code
+ * Job job = new Job(conf);
+ * Map> snapshotScans = ImmutableMap.of(
+ *    "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
+ *    "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
+ * );
+ * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
+ * TableMapReduceUtil.initTableSnapshotMapperJob(
+ *     snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
+ *      MyMapOutputValueWritable.class, job, true, restoreDir);
+ * }
+ * 
+ * Internally, this input format restores each snapshot into a subdirectory of the given tmp + * directory. Input splits and + * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce + * .TableSnapshotInputFormat} + * (one per region). + * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on + * permissioning; the + * same caveats apply here. + * + * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat + * @see org.apache.hadoop.hbase.client.TableSnapshotScanner + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat + implements InputFormat { + + private final MultiTableSnapshotInputFormatImpl delegate; + + public MultiTableSnapshotInputFormat() { + this.delegate = new MultiTableSnapshotInputFormatImpl(); + } + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + List splits = delegate.getSplits(job); + InputSplit[] results = new InputSplit[splits.size()]; + for (int i = 0; i < splits.size(); i++) { + results[i] = new TableSnapshotRegionSplit(splits.get(i)); + } + return results; + } + + @Override + public RecordReader getRecordReader(InputSplit split, JobConf job, + Reporter reporter) throws IOException { + return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job); + } + + /** + * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of + * restoreDir. + * Sets: {@link org.apache.hadoop.hbase.mapreduce + * .MultiTableSnapshotInputFormatImpl#RESTORE_DIRS_KEY}, + * {@link org.apache.hadoop.hbase.mapreduce + * .MultiTableSnapshotInputFormatImpl#SNAPSHOT_TO_SCANS_KEY} + * + * @param conf + * @param snapshotScans + * @param restoreDir + * @throws IOException + */ + public static void setInput(Configuration conf, Map> snapshotScans, + Path restoreDir) throws IOException { + new MultiTableSnapshotInputFormatImpl().setInput(conf, snapshotScans, restoreDir); + } + +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java index 410fd31..7ab24d1 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java @@ -18,19 +18,22 @@ */ package org.apache.hadoop.hbase.mapred; -import java.io.IOException; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.MutationSerialization; import org.apache.hadoop.hbase.mapreduce.ResultSerialization; +import org.apache.hadoop.hbase.mapreduce.MutationSerialization; +import org.apache.hadoop.hbase.mapreduce.ResultSerialization; +import org.apache.hadoop.hbase.mapreduce.*; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.TokenUtil; @@ -41,6 +44,10 @@ import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + /** * Utility for {@link TableMap} and {@link TableReduce} */ @@ -120,6 +127,40 @@ public class TableMapReduceUtil { } /** + * Sets up the job for reading from one or more multiple table snapshots, with one or more scans + * per snapshot. + * It bypasses hbase servers and read directly from snapshot files. + * + * @param snapshotScans map of snapshot name to scans on that snapshot. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + */ + public static void initMultiTableSnapshotMapperJob(Map> snapshotScans, + Class mapper, Class outputKeyClass, Class outputValueClass, + JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException { + MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir); + + job.setInputFormat(MultiTableSnapshotInputFormat.class); + if (outputValueClass != null) { + job.setMapOutputValueClass(outputValueClass); + } + if (outputKeyClass != null) { + job.setMapOutputKeyClass(outputKeyClass); + } + job.setMapperClass(mapper); + if (addDependencyJars) { + addDependencyJars(job); + } + + org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job); + } + + /** * Sets up the job for reading from a table snapshot. It bypasses hbase servers * and read directly from snapshot files. * @@ -319,16 +360,17 @@ public class TableMapReduceUtil { public static void addDependencyJars(JobConf job) throws IOException { org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job); org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars( - job, - // when making changes here, consider also mapreduce.TableMapReduceUtil - // pull job classes - job.getMapOutputKeyClass(), - job.getMapOutputValueClass(), - job.getOutputKeyClass(), - job.getOutputValueClass(), - job.getPartitionerClass(), - job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class), - job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class), - job.getCombinerClass()); + job, + // when making changes here, consider also mapreduce.TableMapReduceUtil + // pull job classes + job.getMapOutputKeyClass(), + job.getMapOutputValueClass(), + job.getOutputKeyClass(), + job.getOutputValueClass(), + job.getPartitionerClass(), + job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class), + job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class), + job.getCombinerClass()); } + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java index 2b6f8b0..628430c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java @@ -17,12 +17,13 @@ */ package org.apache.hadoop.hbase.mapred; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl; import org.apache.hadoop.mapred.InputFormat; @@ -59,8 +60,9 @@ public class TableSnapshotInputFormat implements InputFormat locations) { - this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations); + List locations, Scan scan, Path restoreDir) { + this.delegate = + new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir); } @Override diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java new file mode 100644 index 0000000..bd530c8 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +/** + * MultiTableSnapshotInputFormat generalizes + * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} + * allowing a MapReduce job to run over one or more table snapshots, with one or more scans + * configured for each. + * Internally, the input format delegates to + * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} + * and thus has the same performance advantages; + * see {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for + * more details. + * Usage is similar to TableSnapshotInputFormat, with the following exception: + * initMultiTableSnapshotMapperJob takes in a map + * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding + * scan will be applied; + * the overall dataset for the job is defined by the concatenation of the regions and tables + * included in each snapshot/scan + * pair. + * {@link org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#initMultiTableSnapshotMapperJob + * (java.util.Map, Class, Class, Class, org.apache.hadoop.mapreduce.Job, boolean, org.apache + * .hadoop.fs.Path)} + * can be used to configure the job. + *
{@code
+ * Job job = new Job(conf);
+ * Map> snapshotScans = ImmutableMap.of(
+ *    "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
+ *    "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
+ * );
+ * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
+ * TableMapReduceUtil.initTableSnapshotMapperJob(
+ *     snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
+ *      MyMapOutputValueWritable.class, job, true, restoreDir);
+ * }
+ * 
+ * Internally, this input format restores each snapshot into a subdirectory of the given tmp + * directory. Input splits and + * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce + * .TableSnapshotInputFormat} + * (one per region). + * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on + * permissioning; the + * same caveats apply here. + * + * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat + * @see org.apache.hadoop.hbase.client.TableSnapshotScanner + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat { + + private final MultiTableSnapshotInputFormatImpl delegate; + + public MultiTableSnapshotInputFormat() { + this.delegate = new MultiTableSnapshotInputFormatImpl(); + } + + @Override + public List getSplits(JobContext jobContext) + throws IOException, InterruptedException { + List splits = + delegate.getSplits(jobContext.getConfiguration()); + List rtn = Lists.newArrayListWithCapacity(splits.size()); + + for (TableSnapshotInputFormatImpl.InputSplit split : splits) { + rtn.add(new TableSnapshotInputFormat.TableSnapshotRegionSplit(split)); + } + + return rtn; + } + + public static void setInput(Configuration configuration, + Map> snapshotScans, Path tmpRestoreDir) throws IOException { + new MultiTableSnapshotInputFormatImpl().setInput(configuration, snapshotScans, tmpRestoreDir); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java new file mode 100644 index 0000000..1b8b6c0 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.util.ConfigurationUtil; +import org.apache.hadoop.hbase.util.FSUtils; + +import java.io.IOException; +import java.util.AbstractMap; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * Shared implementation of mapreduce code over multiple table snapshots. + * Utilized by both mapreduce ({@link org.apache.hadoop.hbase.mapreduce + * .MultiTableSnapshotInputFormat} and mapred + * ({@link org.apache.hadoop.hbase.mapred.MultiTableSnapshotInputFormat} implementations. + */ +@InterfaceAudience.LimitedPrivate({ "HBase" }) +@InterfaceStability.Evolving +public class MultiTableSnapshotInputFormatImpl { + + private static final Log LOG = LogFactory.getLog(MultiTableSnapshotInputFormatImpl.class); + + public static final String RESTORE_DIRS_KEY = + "hbase.MultiTableSnapshotInputFormat.restore.snapshotDirMapping"; + public static final String SNAPSHOT_TO_SCANS_KEY = + "hbase.MultiTableSnapshotInputFormat.snapshotsToScans"; + + /** + * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of + * restoreDir. + * Sets: {@link #RESTORE_DIRS_KEY}, {@link #SNAPSHOT_TO_SCANS_KEY} + * + * @param conf + * @param snapshotScans + * @param restoreDir + * @throws IOException + */ + public void setInput(Configuration conf, Map> snapshotScans, + Path restoreDir) throws IOException { + Path rootDir = FSUtils.getRootDir(conf); + FileSystem fs = rootDir.getFileSystem(conf); + + setSnapshotToScans(conf, snapshotScans); + Map restoreDirs = + generateSnapshotToRestoreDirMapping(snapshotScans.keySet(), restoreDir); + setSnapshotDirs(conf, restoreDirs); + restoreSnapshots(conf, restoreDirs, fs); + } + + /** + * Return the list of splits extracted from the scans/snapshots pushed to conf by + * {@link + * #setInput(org.apache.hadoop.conf.Configuration, java.util.Map, org.apache.hadoop.fs.Path)} + * + * @param conf Configuration to determine splits from + * @return Return the list of splits extracted from the scans/snapshots pushed to conf + * @throws IOException + */ + public List getSplits(Configuration conf) + throws IOException { + Path rootDir = FSUtils.getRootDir(conf); + FileSystem fs = rootDir.getFileSystem(conf); + + List rtn = Lists.newArrayList(); + + Map> snapshotsToScans = getSnapshotsToScans(conf); + Map snapshotsToRestoreDirs = getSnapshotDirs(conf); + for (Map.Entry> entry : snapshotsToScans.entrySet()) { + String snapshotName = entry.getKey(); + + Path restoreDir = snapshotsToRestoreDirs.get(snapshotName); + + SnapshotManifest manifest = + TableSnapshotInputFormatImpl.getSnapshotManifest(conf, snapshotName, rootDir, fs); + List regionInfos = + TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest); + + for (Scan scan : entry.getValue()) { + List splits = + TableSnapshotInputFormatImpl.getSplits(scan, manifest, regionInfos, restoreDir, conf); + rtn.addAll(splits); + } + } + return rtn; + } + + /** + * Retrieve the snapshot name -> list mapping pushed to configuration by + * {@link #setSnapshotToScans(org.apache.hadoop.conf.Configuration, java.util.Map)} + * + * @param conf Configuration to extract name -> list mappings from. + * @return the snapshot name -> list mapping pushed to configuration + * @throws IOException + */ + public Map> getSnapshotsToScans(Configuration conf) throws IOException { + + Map> rtn = Maps.newHashMap(); + + for (Map.Entry entry : ConfigurationUtil + .getKeyValues(conf, SNAPSHOT_TO_SCANS_KEY)) { + String snapshotName = entry.getKey(); + String scan = entry.getValue(); + + Collection snapshotScans = rtn.get(snapshotName); + if (snapshotScans == null) { + snapshotScans = Lists.newArrayList(); + rtn.put(snapshotName, snapshotScans); + } + + snapshotScans.add(TableMapReduceUtil.convertStringToScan(scan)); + } + + return rtn; + } + + /** + * Push snapshotScans to conf (under the key {@link #SNAPSHOT_TO_SCANS_KEY}) + * + * @param conf + * @param snapshotScans + * @throws IOException + */ + public void setSnapshotToScans(Configuration conf, Map> snapshotScans) + throws IOException { + // flatten out snapshotScans for serialization to the job conf + List> snapshotToSerializedScans = Lists.newArrayList(); + + for (Map.Entry> entry : snapshotScans.entrySet()) { + String snapshotName = entry.getKey(); + Collection scans = entry.getValue(); + + // serialize all scans and map them to the appropriate snapshot + for (Scan scan : scans) { + snapshotToSerializedScans.add(new AbstractMap.SimpleImmutableEntry(snapshotName, + TableMapReduceUtil.convertScanToString(scan))); + } + } + + ConfigurationUtil.setKeyValues(conf, SNAPSHOT_TO_SCANS_KEY, snapshotToSerializedScans); + } + + /** + * Retrieve the directories into which snapshots have been restored from + * ({@link #RESTORE_DIRS_KEY}) + * + * @param conf Configuration to extract restore directories from + * @return the directories into which snapshots have been restored from + * @throws IOException + */ + public Map getSnapshotDirs(Configuration conf) throws IOException { + List> kvps = ConfigurationUtil.getKeyValues(conf, RESTORE_DIRS_KEY); + Map rtn = Maps.newHashMapWithExpectedSize(kvps.size()); + + for (Map.Entry kvp : kvps) { + rtn.put(kvp.getKey(), new Path(kvp.getValue())); + } + + return rtn; + } + + public void setSnapshotDirs(Configuration conf, Map snapshotDirs) { + Map toSet = Maps.newHashMap(); + + for (Map.Entry entry : snapshotDirs.entrySet()) { + toSet.put(entry.getKey(), entry.getValue().toString()); + } + + ConfigurationUtil.setKeyValues(conf, RESTORE_DIRS_KEY, toSet.entrySet()); + } + + /** + * Generate a random path underneath baseRestoreDir for each snapshot in snapshots and + * return a map from the snapshot to the restore directory. + * + * @param snapshots collection of snapshot names to restore + * @param baseRestoreDir base directory under which all snapshots in snapshots will be restored + * @return a mapping from snapshot name to the directory in which that snapshot has been restored + */ + private Map generateSnapshotToRestoreDirMapping(Collection snapshots, + Path baseRestoreDir) { + Map rtn = Maps.newHashMap(); + + for (String snapshotName : snapshots) { + Path restoreSnapshotDir = + new Path(baseRestoreDir, snapshotName + "__" + UUID.randomUUID().toString()); + rtn.put(snapshotName, restoreSnapshotDir); + } + + return rtn; + } + + /** + * Restore each (snapshot name, restore directory) pair in snapshotToDir + * + * @param conf configuration to restore with + * @param snapshotToDir mapping from snapshot names to restore directories + * @param fs filesystem to do snapshot restoration on + * @throws IOException + */ + public void restoreSnapshots(Configuration conf, Map snapshotToDir, FileSystem fs) + throws IOException { + // TODO: restore from record readers to parallelize. + Path rootDir = FSUtils.getRootDir(conf); + + for (Map.Entry entry : snapshotToDir.entrySet()) { + String snapshotName = entry.getKey(); + Path restoreDir = entry.getValue(); + LOG.info("Restoring snapshot " + snapshotName + " into " + restoreDir + + " for MultiTableSnapshotInputFormat"); + restoreSnapshot(conf, snapshotName, rootDir, restoreDir, fs); + } + } + + void restoreSnapshot(Configuration conf, String snapshotName, Path rootDir, Path restoreDir, + FileSystem fs) throws IOException { + RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); + } + + // TODO: these probably belong elsewhere/may already be implemented elsewhere. + +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index d602bed..b553525 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -24,13 +24,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URL; import java.net.URLDecoder; -import java.util.ArrayList; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.zip.ZipEntry; import java.util.zip.ZipFile; @@ -282,6 +276,43 @@ public class TableMapReduceUtil { } /** + * Sets up the job for reading from one or more table snapshots, with one or more scans + * per snapshot. + * It bypasses hbase servers and read directly from snapshot files. + * + * @param snapshotScans map of snapshot name to scans on that snapshot. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + */ + public static void initMultiTableSnapshotMapperJob(Map> snapshotScans, + Class mapper, Class outputKeyClass, Class outputValueClass, + Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException { + MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir); + + job.setInputFormatClass(MultiTableSnapshotInputFormat.class); + if (outputValueClass != null) { + job.setMapOutputValueClass(outputValueClass); + } + if (outputKeyClass != null) { + job.setMapOutputKeyClass(outputKeyClass); + } + job.setMapperClass(mapper); + Configuration conf = job.getConfiguration(); + HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); + + if (addDependencyJars) { + addDependencyJars(job); + } + + resetCacheConfig(job.getConfiguration()); + } + + /** * Sets up the job for reading from a table snapshot. It bypasses hbase servers * and read directly from snapshot files. * diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java index 745af24..a3799ba 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java @@ -18,33 +18,16 @@ package org.apache.hadoop.hbase.mapreduce; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.TableSnapshotScanner; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; -import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; -import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; -import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; -import org.apache.hadoop.hbase.snapshot.SnapshotManifest; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; @@ -53,17 +36,22 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import com.google.common.annotations.VisibleForTesting; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; /** * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job * bypasses HBase servers, and directly accesses the underlying files (hfile, recovered edits, - * hlogs, etc) directly to provide maximum performance. The snapshot is not required to be + * wals, etc) directly to provide maximum performance. The snapshot is not required to be * restored to the live cluster or cloned. This also allows to run the mapreduce job from an * online or offline hbase cluster. The snapshot files can be exported by using the - * {@link ExportSnapshot} tool, to a pure-hdfs cluster, and this InputFormat can be used to - * run the mapreduce job directly over the snapshot files. The snapshot should not be deleted - * while there are jobs reading from snapshot files. + * {@link org.apache.hadoop.hbase.snapshot.ExportSnapshot} tool, to a pure-hdfs cluster, + * and this InputFormat can be used to run the mapreduce job directly over the snapshot files. + * The snapshot should not be deleted while there are jobs reading from snapshot files. *

* Usage is similar to TableInputFormat, and * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job, @@ -80,8 +68,8 @@ import com.google.common.annotations.VisibleForTesting; *

* Internally, this input format restores the snapshot into the given tmp directory. Similar to * {@link TableInputFormat} an InputSplit is created per region. The region is opened for reading - * from each RecordReader. An internal RegionScanner is used to execute the {@link Scan} obtained - * from the user. + * from each RecordReader. An internal RegionScanner is used to execute the + * {@link org.apache.hadoop.hbase.CellScanner} obtained from the user. *

* HBase owns all the data and snapshot files on the filesystem. Only the 'hbase' user can read from * snapshot files and data files. @@ -91,17 +79,16 @@ import com.google.common.annotations.VisibleForTesting; * user or the user must have group or other privileges in the filesystem (See HBASE-8369). * Note that, given other users access to read from snapshot/data files will completely circumvent * the access control enforced by HBase. - * @see TableSnapshotScanner + * @see org.apache.hadoop.hbase.client.TableSnapshotScanner */ @InterfaceAudience.Public @InterfaceStability.Evolving public class TableSnapshotInputFormat extends InputFormat { - private static final Log LOG = LogFactory.getLog(TableSnapshotInputFormat.class); - public static class TableSnapshotRegionSplit extends InputSplit implements Writable { private TableSnapshotInputFormatImpl.InputSplit delegate; + // constructor for mapreduce framework / Writable public TableSnapshotRegionSplit() { this.delegate = new TableSnapshotInputFormatImpl.InputSplit(); } @@ -111,8 +98,9 @@ public class TableSnapshotInputFormat extends InputFormat locations) { - this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations); + List locations, Scan scan, Path restoreDir) { + this.delegate = + new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir); } @Override @@ -137,7 +125,8 @@ public class TableSnapshotInputFormat extends InputFormat { + static class TableSnapshotRegionRecordReader extends + RecordReader { private TableSnapshotInputFormatImpl.RecordReader delegate = new TableSnapshotInputFormatImpl.RecordReader(); private TaskAttemptContext context; @@ -145,7 +134,7 @@ public class TableSnapshotInputFormat extends InputFormat createRecordReader( - InputSplit split, TaskAttemptContext context) throws IOException { + InputSplit split, TaskAttemptContext context) throws IOException { return new TableSnapshotRegionRecordReader(); } @@ -197,7 +185,7 @@ public class TableSnapshotInputFormat extends InputFormat getSplits(JobContext job) throws IOException, InterruptedException { List results = new ArrayList(); for (TableSnapshotInputFormatImpl.InputSplit split : - TableSnapshotInputFormatImpl.getSplits(job.getConfiguration())) { + TableSnapshotInputFormatImpl.getSplits(job.getConfiguration())) { results.add(new TableSnapshotRegionSplit(split)); } return results; @@ -212,7 +200,8 @@ public class TableSnapshotInputFormat extends InputFormat locations) { + public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List locations, + Scan scan, Path restoreDir) { this.htd = htd; this.regionInfo = regionInfo; if (locations == null || locations.isEmpty()) { @@ -96,6 +98,25 @@ public class TableSnapshotInputFormatImpl { } else { this.locations = locations.toArray(new String[locations.size()]); } + try { + this.scan = scan != null ? TableMapReduceUtil.convertScanToString(scan) : ""; + } catch (IOException e) { + LOG.warn("Failed to convert Scan to String", e); + } + + this.restoreDir = restoreDir.toString(); + } + + public HTableDescriptor getHtd() { + return htd; + } + + public String getScan() { + return scan; + } + + public String getRestoreDir() { + return restoreDir; } public long getLength() { @@ -119,15 +140,15 @@ public class TableSnapshotInputFormatImpl { // doing this wrapping with Writables. @Override public void write(DataOutput out) throws IOException { - MapReduceProtos.TableSnapshotRegionSplit.Builder builder = MapReduceProtos.TableSnapshotRegionSplit.newBuilder() - .setTable(htd.convert()) - .setRegion(HRegionInfo.convert(regionInfo)); + TableSnapshotRegionSplit.Builder builder = TableSnapshotRegionSplit.newBuilder() + .setTable(htd.convert()) + .setRegion(HRegionInfo.convert(regionInfo)); for (String location : locations) { builder.addLocations(location); } - MapReduceProtos.TableSnapshotRegionSplit split = builder.build(); + TableSnapshotRegionSplit split = builder.build(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); split.writeTo(baos); @@ -135,6 +156,10 @@ public class TableSnapshotInputFormatImpl { byte[] buf = baos.toByteArray(); out.writeInt(buf.length); out.write(buf); + + Bytes.writeByteArray(out, Bytes.toBytes(scan)); + Bytes.writeByteArray(out, Bytes.toBytes(restoreDir)); + } @Override @@ -147,6 +172,9 @@ public class TableSnapshotInputFormatImpl { this.regionInfo = HRegionInfo.convert(split.getRegion()); List locationsList = split.getLocationsList(); this.locations = locationsList.toArray(new String[locationsList.size()]); + + this.scan = Bytes.toString(Bytes.readByteArray(in)); + this.restoreDir = Bytes.toString(Bytes.readByteArray(in)); } } @@ -154,7 +182,7 @@ public class TableSnapshotInputFormatImpl { * Implementation class for RecordReader logic common between mapred and mapreduce. */ public static class RecordReader { - InputSplit split; + private InputSplit split; private Scan scan; private Result result = null; private ImmutableBytesWritable row = null; @@ -165,28 +193,12 @@ public class TableSnapshotInputFormatImpl { } public void initialize(InputSplit split, Configuration conf) throws IOException { + this.scan = TableMapReduceUtil.convertStringToScan(split.getScan()); this.split = split; HTableDescriptor htd = split.htd; HRegionInfo hri = this.split.getRegionInfo(); FileSystem fs = FSUtils.getCurrentFileSystem(conf); - Path tmpRootDir = new Path(conf.get(RESTORE_DIR_KEY)); // This is the user specified root - // directory where snapshot was restored - - // create scan - // TODO: mapred does not support scan as input API. Work around for now. - if (conf.get(TableInputFormat.SCAN) != null) { - scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN)); - } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) { - String[] columns = - conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" "); - scan = new Scan(); - for (String col : columns) { - scan.addFamily(Bytes.toBytes(col)); - } - } else { - throw new IllegalArgumentException("A Scan is not configured for this job"); - } // region is immutable, this should be fine, // otherwise we have to set the thread read point @@ -194,7 +206,8 @@ public class TableSnapshotInputFormatImpl { // disable caching of data blocks scan.setCacheBlocks(false); - scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null); + scanner = + new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null); } public boolean nextKeyValue() throws IOException { @@ -237,24 +250,43 @@ public class TableSnapshotInputFormatImpl { public static List getSplits(Configuration conf) throws IOException { String snapshotName = getSnapshotName(conf); - Path rootDir = new Path(conf.get(HConstants.HBASE_DIR)); + Path rootDir = FSUtils.getRootDir(conf); FileSystem fs = rootDir.getFileSystem(conf); - Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); - HBaseProtos.SnapshotDescription snapshotDesc = - SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); - SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc); + SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs); - List regionManifests = manifest.getRegionManifests(); + List regionInfos = getRegionInfosFromManifest(manifest); + + // TODO: mapred does not support scan as input API. Work around for now. + Scan scan = extractScanFromConf(conf); + // the temp dir where the snapshot is restored + Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY)); + + return getSplits(scan, manifest, regionInfos, restoreDir, conf); + } + public static List getRegionInfosFromManifest(SnapshotManifest manifest) { + List regionManifests = manifest.getRegionManifests(); if (regionManifests == null) { - throw new IllegalArgumentException("Snapshot seems empty"); + throw new IllegalArgumentException("Snapshot seems empty"); } - // load table descriptor - HTableDescriptor htd = manifest.getTableDescriptor(); + List regionInfos = Lists.newArrayListWithCapacity(regionManifests.size()); - // TODO: mapred does not support scan as input API. Work around for now. + for (SnapshotRegionManifest regionManifest : regionManifests) { + regionInfos.add(HRegionInfo.convert(regionManifest.getRegionInfo())); + } + return regionInfos; + } + + public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName, + Path rootDir, FileSystem fs) throws IOException { + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); + SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); + return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc); + } + + public static Scan extractScanFromConf(Configuration conf) throws IOException { Scan scan = null; if (conf.get(TableInputFormat.SCAN) != null) { scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN)); @@ -268,29 +300,35 @@ public class TableSnapshotInputFormatImpl { } else { throw new IllegalArgumentException("Unable to create scan"); } - // the temp dir where the snapshot is restored - Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY)); + return scan; + } + + public static List getSplits(Scan scan, SnapshotManifest manifest, + List regionManifests, Path restoreDir, Configuration conf) throws IOException { + // load table descriptor + HTableDescriptor htd = manifest.getTableDescriptor(); + Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName()); List splits = new ArrayList(); - for (SnapshotRegionManifest regionManifest : regionManifests) { + for (HRegionInfo hri : regionManifests) { // load region descriptor - HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo()); - if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), - hri.getStartKey(), hri.getEndKey())) { + if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(), + hri.getEndKey())) { // compute HDFS locations from snapshot files (which will get the locations for // referred hfiles) List hosts = getBestLocations(conf, - HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); + HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); int len = Math.min(3, hosts.size()); hosts = hosts.subList(0, len); - splits.add(new InputSplit(htd, hri, hosts)); + splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir)); } } return splits; + } /** @@ -298,7 +336,8 @@ public class TableSnapshotInputFormatImpl { * weights into account, thus will treat every location passed from the input split as equal. We * do not want to blindly pass all the locations, since we are creating one split per region, and * the region's blocks are all distributed throughout the cluster unless favorite node assignment - * is used. On the expected stable case, only one location will contain most of the blocks as local. + * is used. On the expected stable case, only one location will contain most of the blocks as + * local. * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here * we are doing a simple heuristic, where we will pass all hosts which have at least 80% * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top @@ -355,7 +394,7 @@ public class TableSnapshotInputFormatImpl { throws IOException { conf.set(SNAPSHOT_NAME_KEY, snapshotName); - Path rootDir = new Path(conf.get(HConstants.HBASE_DIR)); + Path rootDir = FSUtils.getRootDir(conf); FileSystem fs = rootDir.getFileSystem(conf); restoreDir = new Path(restoreDir, UUID.randomUUID().toString()); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConfigurationUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConfigurationUtil.java new file mode 100644 index 0000000..598d973 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConfigurationUtil.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import com.google.common.collect.Lists; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +import java.util.AbstractMap; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +/** + * Utilities for storing more complex collection types in + * {@link org.apache.hadoop.conf.Configuration} instances. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public final class ConfigurationUtil { + // TODO: hopefully this is a good delimiter; it's not in the base64 alphabet, + // nor is it valid for paths + public static final char KVP_DELIMITER = '^'; + + // Disallow instantiation + private ConfigurationUtil() { + + } + + /** + * Store a collection of Map.Entry's in conf, with each entry separated by ',' + * and key values delimited by {@link #KVP_DELIMITER} + * + * @param conf configuration to store the collection in + * @param key overall key to store keyValues under + * @param keyValues kvps to be stored under key in conf + */ + public static void setKeyValues(Configuration conf, String key, + Collection> keyValues) { + setKeyValues(conf, key, keyValues, KVP_DELIMITER); + } + + /** + * Store a collection of Map.Entry's in conf, with each entry separated by ',' + * and key values delimited by delimiter. + * + * @param conf configuration to store the collection in + * @param key overall key to store keyValues under + * @param keyValues kvps to be stored under key in conf + * @param delimiter character used to separate each kvp + */ + public static void setKeyValues(Configuration conf, String key, + Collection> keyValues, char delimiter) { + List serializedKvps = Lists.newArrayList(); + + for (Map.Entry kvp : keyValues) { + serializedKvps.add(kvp.getKey() + delimiter + kvp.getValue()); + } + + conf.setStrings(key, serializedKvps.toArray(new String[serializedKvps.size()])); + } + + /** + * Retrieve a list of key value pairs from configuration, stored under the provided key + * + * @param conf configuration to retrieve kvps from + * @param key key under which the key values are stored + * @return the list of kvps stored under key in conf, or null if the key isn't present. + * @see #setKeyValues(Configuration, String, Collection, char) + */ + public static List> getKeyValues(Configuration conf, String key) { + return getKeyValues(conf, key, KVP_DELIMITER); + } + + /** + * Retrieve a list of key value pairs from configuration, stored under the provided key + * + * @param conf configuration to retrieve kvps from + * @param key key under which the key values are stored + * @param delimiter character used to separate each kvp + * @return the list of kvps stored under key in conf, or null if the key isn't present. + * @see #setKeyValues(Configuration, String, Collection, char) + */ + public static List> getKeyValues(Configuration conf, String key, + char delimiter) { + String[] kvps = conf.getStrings(key); + + if (kvps == null) { + return null; + } + + List> rtn = Lists.newArrayList(); + + for (String kvp : kvps) { + String[] splitKvp = StringUtils.split(kvp, delimiter); + + if (splitKvp.length != 2) { + throw new IllegalArgumentException( + "Expected key value pair for configuration key '" + key + "'" + " to be of form '" + + delimiter + "; was " + kvp + " instead"); + } + + rtn.add(new AbstractMap.SimpleImmutableEntry(splitKvp[0], splitKvp[1])); + } + return rtn; + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 6b02205..b5f5b58 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -118,7 +118,7 @@ import org.apache.zookeeper.ZooKeeper.States; * Create an instance and keep it around testing HBase. This class is * meant to be your one-stop shop for anything you might need testing. Manages * one cluster at a time only. Managed cluster can be an in-process - * {@link MiniHBaseCluster}, or a deployed cluster of type {@link DistributedHBaseCluster}. + * {@link MiniHBaseCluster}, or a deployed cluster of type {@link HBaseCluster}. * Not all methods work with the real cluster. * Depends on log4j being on classpath and * hbase-site.xml for logging and test-run configuration. It does not set @@ -1508,6 +1508,25 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } /** + * Create a table with multiple regions. + * @param tableName + * @param family + * @param numRegions + * @return An HTable instance for the created table. + * @throws IOException + */ + public HTable createMultiRegionTable(TableName tableName, byte[] family, int numRegions) + throws IOException { + if (numRegions < 3) throw new IOException("Must create at least 3 regions"); + byte[] startKey = Bytes.toBytes("aaaaa"); + byte[] endKey = Bytes.toBytes("zzzzz"); + byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); + + return createTable(tableName.toBytes(), new byte[][] { family }, splitKeys); + } + + + /** * Drop an existing table * @param tableName existing table */ diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java new file mode 100644 index 0000000..3bb188d --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapred; + +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +@Category({ LargeTests.class }) +public class TestMultiTableSnapshotInputFormat + extends org.apache.hadoop.hbase.mapreduce.TestMultiTableSnapshotInputFormat { + + private static final Log LOG = LogFactory.getLog(TestMultiTableSnapshotInputFormat.class); + + @Override + protected void runJob(String jobName, Configuration c, List scans) + throws IOException, InterruptedException, ClassNotFoundException { + JobConf job = new JobConf(TEST_UTIL.getConfiguration()); + + job.setJobName(jobName); + job.setMapperClass(Mapper.class); + job.setReducerClass(Reducer.class); + + TableMapReduceUtil.initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), Mapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir); + + TableMapReduceUtil.addDependencyJars(job); + + job.setReducerClass(Reducer.class); + job.setNumReduceTasks(1); // one to get final "first" and "last" key + FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); + LOG.info("Started " + job.getJobName()); + + RunningJob runningJob = JobClient.runJob(job); + runningJob.waitForCompletion(); + assertTrue(runningJob.isSuccessful()); + LOG.info("After map/reduce completion - job " + jobName); + } + + public static class Mapper extends TestMultiTableSnapshotInputFormat.ScanMapper + implements TableMap { + + @Override + public void map(ImmutableBytesWritable key, Result value, + OutputCollector outputCollector, + Reporter reporter) throws IOException { + makeAssertions(key, value); + outputCollector.collect(key, key); + } + + /** + * Closes this stream and releases any system resources associated + * with it. If the stream is already closed then invoking this + * method has no effect. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + } + + @Override + public void configure(JobConf jobConf) { + + } + } + + public static class Reducer extends TestMultiTableSnapshotInputFormat.ScanReducer implements + org.apache.hadoop.mapred.Reducer { + + private JobConf jobConf; + + @Override + public void reduce(ImmutableBytesWritable key, Iterator values, + OutputCollector outputCollector, Reporter reporter) + throws IOException { + makeAssertions(key, Lists.newArrayList(values)); + } + + /** + * Closes this stream and releases any system resources associated + * with it. If the stream is already closed then invoking this + * method has no effect. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + super.cleanup(this.jobConf); + } + + @Override + public void configure(JobConf jobConf) { + this.jobConf = jobConf; + } + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java new file mode 100644 index 0000000..130fa23 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Base set of tests and setup for input formats touching multiple tables. + */ +public abstract class MultiTableInputFormatTestBase { + static final Log LOG = LogFactory.getLog(MultiTableInputFormatTestBase.class); + public static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + static final String TABLE_NAME = "scantest"; + static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); + static final String KEY_STARTROW = "startRow"; + static final String KEY_LASTROW = "stpRow"; + + static List TABLES = Lists.newArrayList(); + + static { + for (int i = 0; i < 3; i++) { + TABLES.add(TABLE_NAME + String.valueOf(i)); + } + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // switch TIF to log at DEBUG level + TEST_UTIL.enableDebug(MultiTableInputFormatBase.class); + // start mini hbase cluster + TEST_UTIL.startMiniCluster(3); + // create and fill table + for (String tableName : TABLES) { + HTable table = null; + try { + table = TEST_UTIL.createMultiRegionTable(TableName.valueOf(tableName), INPUT_FAMILY, 4); + TEST_UTIL.loadTable(table, INPUT_FAMILY, false); + } finally { + if (table != null) { + table.close(); + } + } + } + // start MR cluster + TEST_UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniMapReduceCluster(); + TEST_UTIL.shutdownMiniCluster(); + } + + @After + public void tearDown() throws Exception { + Configuration c = TEST_UTIL.getConfiguration(); + FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir"))); + } + + /** + * Pass the key and value to reducer. + */ + public static class ScanMapper extends + TableMapper { + /** + * Pass the key and value to reduce. + * + * @param key The key, here "aaa", "aab" etc. + * @param value The value is the same as the key. + * @param context The task context. + * @throws IOException When reading the rows fails. + */ + @Override + public void map(ImmutableBytesWritable key, Result value, Context context) + throws IOException, InterruptedException { + makeAssertions(key, value); + context.write(key, key); + } + + public void makeAssertions(ImmutableBytesWritable key, Result value) throws IOException { + if (value.size() != 1) { + throw new IOException("There should only be one input column"); + } + Map>> cf = + value.getMap(); + if (!cf.containsKey(INPUT_FAMILY)) { + throw new IOException("Wrong input columns. Missing: '" + + Bytes.toString(INPUT_FAMILY) + "'."); + } + String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null)); + LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) + + ", value -> " + val); + } + } + + /** + * Checks the last and first keys seen against the scanner boundaries. + */ + public static class ScanReducer + extends + Reducer { + private String first = null; + private String last = null; + + @Override + protected void reduce(ImmutableBytesWritable key, + Iterable values, Context context) + throws IOException, InterruptedException { + makeAssertions(key, values); + } + + protected void makeAssertions(ImmutableBytesWritable key, + Iterable values) { + int count = 0; + for (ImmutableBytesWritable value : values) { + String val = Bytes.toStringBinary(value.get()); + LOG.debug("reduce: key[" + count + "] -> " + + Bytes.toStringBinary(key.get()) + ", value -> " + val); + if (first == null) first = val; + last = val; + count++; + } + assertEquals(3, count); + } + + @Override + protected void cleanup(Context context) throws IOException, + InterruptedException { + Configuration c = context.getConfiguration(); + cleanup(c); + } + + protected void cleanup(Configuration c) { + String startRow = c.get(KEY_STARTROW); + String lastRow = c.get(KEY_LASTROW); + LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + + startRow + "\""); + LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + + "\""); + if (startRow != null && startRow.length() > 0) { + assertEquals(startRow, first); + } + if (lastRow != null && lastRow.length() > 0) { + assertEquals(lastRow, last); + } + } + } + + @Test + public void testScanEmptyToEmpty() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, null, null); + } + + @Test + public void testScanEmptyToAPP() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, "app", "apo"); + } + + @Test + public void testScanOBBToOPP() throws IOException, InterruptedException, + ClassNotFoundException { + testScan("obb", "opp", "opo"); + } + + @Test + public void testScanYZYToEmpty() throws IOException, InterruptedException, + ClassNotFoundException { + testScan("yzy", null, "zzz"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + private void testScan(String start, String stop, String last) + throws IOException, InterruptedException, ClassNotFoundException { + String jobName = + "Scan" + (start != null ? start.toUpperCase() : "Empty") + "To" + + (stop != null ? stop.toUpperCase() : "Empty"); + LOG.info("Before map/reduce startup - job " + jobName); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + c.set(KEY_STARTROW, start != null ? start : ""); + c.set(KEY_LASTROW, last != null ? last : ""); + + List scans = new ArrayList(); + + for (String tableName : TABLES) { + Scan scan = new Scan(); + + scan.addFamily(INPUT_FAMILY); + scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName)); + + if (start != null) { + scan.setStartRow(Bytes.toBytes(start)); + } + if (stop != null) { + scan.setStopRow(Bytes.toBytes(stop)); + } + + scans.add(scan); + + LOG.info("scan before: " + scan); + } + + runJob(jobName, c, scans); + } + + protected void runJob(String jobName, Configuration c, List scans) + throws IOException, InterruptedException, ClassNotFoundException { + Job job = new Job(c, jobName); + + initJob(scans, job); + job.setReducerClass(ScanReducer.class); + job.setNumReduceTasks(1); // one to get final "first" and "last" key + FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); + LOG.info("Started " + job.getJobName()); + job.waitForCompletion(true); + assertTrue(job.isSuccessful()); + LOG.info("After map/reduce completion - job " + jobName); + } + + protected abstract void initJob(List scans, Job job) throws IOException; + + +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java new file mode 100644 index 0000000..f3e6d8d --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Multimaps; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.mapreduce.Job; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +@Category({ LargeTests.class }) +public class TestMultiTableSnapshotInputFormat extends MultiTableInputFormatTestBase { + + protected Path restoreDir; + + @BeforeClass + public static void setUpSnapshots() throws Exception { + + TEST_UTIL.enableDebug(MultiTableSnapshotInputFormat.class); + TEST_UTIL.enableDebug(MultiTableSnapshotInputFormatImpl.class); + + // take a snapshot of every table we have. + for (String tableName : TABLES) { + SnapshotTestingUtils + .createSnapshotAndValidate(TEST_UTIL.getHBaseAdmin(), TableName.valueOf(tableName), + ImmutableList.of(MultiTableInputFormatTestBase.INPUT_FAMILY), null, + snapshotNameForTable(tableName), FSUtils.getRootDir(TEST_UTIL.getConfiguration()), + TEST_UTIL.getTestFileSystem(), true); + } + } + + @Before + public void setUp() throws Exception { + this.restoreDir = new Path("/tmp"); + + } + + @Override + protected void initJob(List scans, Job job) throws IOException { + TableMapReduceUtil + .initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), ScanMapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir); + } + + protected Map> getSnapshotScanMapping(final List scans) { + return Multimaps.index(scans, new Function() { + @Nullable + @Override + public String apply(Scan input) { + return snapshotNameForTable( + Bytes.toStringBinary(input.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME))); + } + }).asMap(); + } + + public static String snapshotNameForTable(String tableName) { + return tableName + "_snapshot"; + } + +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java new file mode 100644 index 0000000..b4b8056 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.verify; + +@Category({ SmallTests.class }) +public class TestMultiTableSnapshotInputFormatImpl { + + private MultiTableSnapshotInputFormatImpl subject; + private Map> snapshotScans; + private Path restoreDir; + private Configuration conf; + private Path rootDir; + + @Before + public void setUp() throws Exception { + this.subject = Mockito.spy(new MultiTableSnapshotInputFormatImpl()); + + // mock out restoreSnapshot + // TODO: this is kind of meh; it'd be much nicer to just inject the RestoreSnapshotHelper + // dependency into the + // input format. However, we need a new RestoreSnapshotHelper per snapshot in the current + // design, and it *also* + // feels weird to introduce a RestoreSnapshotHelperFactory and inject that, which would + // probably be the more "pure" + // way of doing things. This is the lesser of two evils, perhaps? + doNothing().when(this.subject). + restoreSnapshot(any(Configuration.class), any(String.class), any(Path.class), + any(Path.class), any(FileSystem.class)); + + this.conf = new Configuration(); + this.rootDir = new Path("file:///test-root-dir"); + FSUtils.setRootDir(conf, rootDir); + this.snapshotScans = ImmutableMap.>of("snapshot1", + ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2"))), "snapshot2", + ImmutableList.of(new Scan(Bytes.toBytes("3"), Bytes.toBytes("4")), + new Scan(Bytes.toBytes("5"), Bytes.toBytes("6")))); + + this.restoreDir = new Path(FSUtils.getRootDir(conf), "restore-dir"); + + } + + public void callSetInput() throws IOException { + subject.setInput(this.conf, snapshotScans, restoreDir); + } + + public Map> toScanWithEquals( + Map> snapshotScans) throws IOException { + Map> rtn = Maps.newHashMap(); + + for (Map.Entry> entry : snapshotScans.entrySet()) { + List scans = Lists.newArrayList(); + + for (Scan scan : entry.getValue()) { + scans.add(new ScanWithEquals(scan)); + } + rtn.put(entry.getKey(), scans); + } + + return rtn; + } + + public static class ScanWithEquals { + + private final String startRow; + private final String stopRow; + + /** + * Creates a new instance of this class while copying all values. + * + * @param scan The scan instance to copy from. + * @throws java.io.IOException When copying the values fails. + */ + public ScanWithEquals(Scan scan) throws IOException { + this.startRow = Bytes.toStringBinary(scan.getStartRow()); + this.stopRow = Bytes.toStringBinary(scan.getStopRow()); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof ScanWithEquals)) { + return false; + } + ScanWithEquals otherScan = (ScanWithEquals) obj; + return Objects.equals(this.startRow, otherScan.startRow) && Objects + .equals(this.stopRow, otherScan.stopRow); + } + + @Override + public String toString() { + return com.google.common.base.Objects.toStringHelper(this).add("startRow", startRow) + .add("stopRow", stopRow).toString(); + } + } + + @Test + public void testSetInputSetsSnapshotToScans() throws Exception { + + callSetInput(); + + Map> actual = subject.getSnapshotsToScans(conf); + + // convert to scans we can use .equals on + Map> actualWithEquals = toScanWithEquals(actual); + Map> expectedWithEquals = toScanWithEquals(snapshotScans); + + assertEquals(expectedWithEquals, actualWithEquals); + } + + @Test + public void testSetInputPushesRestoreDirectories() throws Exception { + callSetInput(); + + Map restoreDirs = subject.getSnapshotDirs(conf); + + assertEquals(this.snapshotScans.keySet(), restoreDirs.keySet()); + } + + @Test + public void testSetInputCreatesRestoreDirectoriesUnderRootRestoreDir() throws Exception { + callSetInput(); + + Map restoreDirs = subject.getSnapshotDirs(conf); + + for (Path snapshotDir : restoreDirs.values()) { + assertEquals("Expected " + snapshotDir + " to be a child of " + restoreDir, restoreDir, + snapshotDir.getParent()); + } + } + + @Test + public void testSetInputRestoresSnapshots() throws Exception { + callSetInput(); + + Map snapshotDirs = subject.getSnapshotDirs(conf); + + for (Map.Entry entry : snapshotDirs.entrySet()) { + verify(this.subject).restoreSnapshot(eq(this.conf), eq(entry.getKey()), eq(this.rootDir), + eq(entry.getValue()), any(FileSystem.class)); + } + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java index e022d6b..25e0a9d 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java @@ -112,6 +112,6 @@ public class TestTableInputFormatScan2 extends TestTableInputFormatScanBase { @Test public void testScanFromConfiguration() throws IOException, InterruptedException, ClassNotFoundException { - testScanFromConfiguration("bba", "bbd", "bbc"); + testScan("bba", "bbd", "bbc"); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestConfigurationUtil.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestConfigurationUtil.java new file mode 100644 index 0000000..a9ecf9e --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestConfigurationUtil.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +@Category({ SmallTests.class }) +public class TestConfigurationUtil { + + private Configuration conf; + private Map keyValues; + private String key; + + @Before + public void setUp() throws Exception { + this.conf = new Configuration(); + this.keyValues = ImmutableMap.of("k1", "v1", "k2", "v2"); + this.key = "my_conf_key"; + } + + public void callSetKeyValues() { + ConfigurationUtil.setKeyValues(conf, key, keyValues.entrySet()); + } + + public List> callGetKeyValues() { + return ConfigurationUtil.getKeyValues(conf, key); + } + + @Test + public void testGetAndSetKeyValuesWithValues() throws Exception { + callSetKeyValues(); + assertEquals(Lists.newArrayList(this.keyValues.entrySet()), callGetKeyValues()); + } + + @Test + public void testGetKeyValuesWithUnsetKey() throws Exception { + assertNull(callGetKeyValues()); + } + +}