diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java
new file mode 100644
index 0000000..9f094e2
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java
@@ -0,0 +1,99 @@
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.mapred.*;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * MultiTableSnapshotInputFormat generalizes {@link org.apache.hadoop.hbase.mapred.TableSnapshotInputFormat}
+ * allowing a MapReduce job to run over one or more table snapshots, with one or more scans configured for each.
+ * Internally, the input format delegates to {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}
+ * and thus has the same performance advantages; see {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for
+ * more details.
+ *
+
+ *
+ * Usage is similar to TableSnapshotInputFormat, with the following exception: initMultiTableSnapshotMapperJob takes in a map
+ * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding scan will be applied;
+ * the overall dataset for the job is defined by the concatenation of the regions and tables included in each snapshot/scan
+ * pair.
+ *
+ * {@link org.apache.hadoop.hbase.mapred.TableMapReduceUtil#initMultiTableSnapshotMapperJob(Map, Class, Class, Class, JobConf, boolean, Path)}
+ * can be used to configure the job.
+ *
{@code
+ * Job job = new Job(conf);
+ * Map> snapshotScans = ImmutableMap.of(
+ * "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
+ * "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
+ * );
+ * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
+ * TableMapReduceUtil.initTableSnapshotMapperJob(
+ * snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
+ * MyMapOutputValueWritable.class, job, true, restoreDir);
+ * }
+ *
+ *
+ * Internally, this input format restores each snapshot into a subdirectory of the given tmp directory. Input splits and
+ * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}
+ * (one per region).
+ *
+ *
+ * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on permissioning; the
+ * same caveats apply here.
+ *
+ * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
+ * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
+ */
+public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat implements InputFormat {
+
+ private final MultiTableSnapshotInputFormatImpl delegate;
+
+ public MultiTableSnapshotInputFormat() {
+ this.delegate = new MultiTableSnapshotInputFormatImpl();
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ List splits =
+ delegate.getSplits(job);
+ InputSplit[] results = new InputSplit[splits.size()];
+ for (int i = 0; i < splits.size(); i++) {
+ results[i] = new TableSnapshotRegionSplit(splits.get(i));
+ }
+ return results;
+ }
+
+ @Override
+ public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter)
+ throws IOException {
+ return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
+ }
+
+
+ /**
+ * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of restoreDir.
+ *
+ * Sets: {@link org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl#RESTORE_DIRS_KEY},
+ * {@link org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl#SNAPSHOT_TO_SCANS_KEY}
+ * @param conf
+ * @param snapshotScans
+ * @param restoreDir
+ * @throws IOException
+ */
+ public static void setInput(Configuration conf, Map> snapshotScans, Path restoreDir) throws IOException {
+ new MultiTableSnapshotInputFormatImpl().setInput(conf, snapshotScans, restoreDir);
+ }
+
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
index b5fefbb..5a8b672 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
@@ -19,8 +19,9 @@
package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
@@ -30,25 +31,18 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
-import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
-import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
-import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
+import org.apache.hadoop.hbase.mapreduce.*;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.TokenUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.security.token.Token;
-import org.apache.zookeeper.KeeperException;
/**
* Utility for {@link TableMap} and {@link TableReduce}
@@ -128,6 +122,44 @@ public class TableMapReduceUtil {
}
/**
+ * Sets up the job for reading from one or more multiple table snapshots, with one or more scan per snapshot.
+ * It bypasses hbase servers and read directly from snapshot files.
+ *
+ * @param snapshotScans map of snapshot name to scans on that snapshot.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
+ * @param addDependencyJars upload HBase jars and jars for any of the configured
+ * job classes via the distributed cache (tmpjars).
+ */
+ public static void initMultiTableSnapshotMapperJob(Map> snapshotScans,
+ Class extends TableMap> mapper,
+ Class> outputKeyClass,
+ Class> outputValueClass, JobConf job,
+ boolean addDependencyJars, Path tmpRestoreDir
+ ) throws IOException {
+ MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir);
+
+
+ job.setInputFormat(MultiTableSnapshotInputFormat.class);
+ if (outputValueClass != null) {
+ job.setMapOutputValueClass(outputValueClass);
+ }
+ if (outputKeyClass != null) {
+ job.setMapOutputKeyClass(outputKeyClass);
+ }
+ job.setMapperClass(mapper);
+ if (addDependencyJars) {
+ addDependencyJars(job);
+ }
+
+ org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
+ }
+
+
+ /**
* Sets up the job for reading from a table snapshot. It bypasses hbase servers
* and read directly from snapshot files.
*
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
index 1c5e4bd..502ee4a 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
import org.apache.hadoop.mapred.InputFormat;
@@ -60,8 +61,8 @@ public class TableSnapshotInputFormat implements InputFormat locations) {
- this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations);
+ List locations, Scan scan, Path restoreDir) {
+ this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
}
@Override
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java
new file mode 100644
index 0000000..b761eae
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java
@@ -0,0 +1,80 @@
+package org.apache.hadoop.hbase.mapreduce;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.*;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * MultiTableSnapshotInputFormat generalizes {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}
+ * allowing a MapReduce job to run over one or more table snapshots, with one or more scans configured for each.
+ * Internally, the input format delegates to {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}
+ * and thus has the same performance advantages; see {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for
+ * more details.
+ *
+
+ *
+ * Usage is similar to TableSnapshotInputFormat, with the following exception: initMultiTableSnapshotMapperJob takes in a map
+ * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding scan will be applied;
+ * the overall dataset for the job is defined by the concatenation of the regions and tables included in each snapshot/scan
+ * pair.
+ *
+ * {@link org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#initMultiTableSnapshotMapperJob(java.util.Map, Class, Class, Class, org.apache.hadoop.mapreduce.Job, boolean, org.apache.hadoop.fs.Path)}
+ * can be used to configure the job.
+ *
{@code
+ * Job job = new Job(conf);
+ * Map> snapshotScans = ImmutableMap.of(
+ * "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
+ * "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
+ * );
+ * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
+ * TableMapReduceUtil.initTableSnapshotMapperJob(
+ * snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
+ * MyMapOutputValueWritable.class, job, true, restoreDir);
+ * }
+ *
+ *
+ * Internally, this input format restores each snapshot into a subdirectory of the given tmp directory. Input splits and
+ * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}
+ * (one per region).
+ *
+ *
+ * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on permissioning; the
+ * same caveats apply here.
+ *
+ * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
+ * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
+ */
+public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat {
+
+ private final MultiTableSnapshotInputFormatImpl delegate;
+
+ public MultiTableSnapshotInputFormat() {
+ this.delegate = new MultiTableSnapshotInputFormatImpl();
+ }
+
+ @Override
+ public List getSplits(JobContext jobContext) throws IOException, InterruptedException {
+ List splits = delegate.getSplits(jobContext.getConfiguration());
+ List rtn = Lists.newArrayListWithCapacity(splits.size());
+
+ for (TableSnapshotInputFormatImpl.InputSplit split : splits) {
+ rtn.add(new TableSnapshotInputFormat.TableSnapshotRegionSplit(split));
+ }
+
+ return rtn;
+ }
+
+
+ public static void setInput(Configuration configuration, Map> snapshotScans, Path tmpRestoreDir) throws IOException {
+ new MultiTableSnapshotInputFormatImpl().setInput(configuration, snapshotScans, tmpRestoreDir);
+ }
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java
new file mode 100644
index 0000000..171294a
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java
@@ -0,0 +1,247 @@
+package org.apache.hadoop.hbase.mapreduce;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
+import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Shared implementation of mapreduce code over multiple table snapshots.
+ *
+ * Utilized by both mapreduce ({@link org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormat} and mapred
+ * ({@link org.apache.hadoop.hbase.mapred.MultiTableSnapshotInputFormat} implementations.
+ */
+public class MultiTableSnapshotInputFormatImpl {
+
+ private static final Log LOG = LogFactory.getLog(MultiTableSnapshotInputFormat.class);
+
+ // TODO: hopefully this is a good delimiter; it's not in the base64 alphabet, nor is it valid for paths
+ public static final char KVP_DELIMITER = '^';
+
+ public static final String RESTORE_DIRS_KEY = "hbase.MultiTableSnapshotInputFormat.restore.snapshotDirMapping";
+ public static final String SNAPSHOT_TO_SCANS_KEY = "hbase.MultiTableSnapshotInputFormat.snapshotsToScans";
+
+ /**
+ * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of restoreDir.
+ *
+ * Sets: {@link #RESTORE_DIRS_KEY}, {@link #SNAPSHOT_TO_SCANS_KEY}
+ * @param conf
+ * @param snapshotScans
+ * @param restoreDir
+ * @throws IOException
+ */
+ public void setInput(Configuration conf, Map> snapshotScans, Path restoreDir) throws IOException {
+ Path rootDir = FSUtils.getRootDir(conf);
+ FileSystem fs = rootDir.getFileSystem(conf);
+
+ setSnapshotToScans(conf, snapshotScans);
+ Map restoreDirs = generateSnapshotToRestoreDir(snapshotScans.keySet(), restoreDir);
+ setSnapshotDirs(conf, restoreDirs);
+ restoreSnapshots(conf, restoreDirs, fs);
+ }
+
+ /**
+ * Return the list of splits extracted from the scans/snapshots pushed to conf by {@link #setInput(org.apache.hadoop.conf.Configuration, java.util.Map, org.apache.hadoop.fs.Path)}
+ * @param conf
+ * @return
+ * @throws IOException
+ */
+ public List getSplits(Configuration conf) throws IOException {
+ Path rootDir = FSUtils.getRootDir(conf);
+ FileSystem fs = rootDir.getFileSystem(conf);
+
+ List rtn = Lists.newArrayList();
+
+ Map> snapshotsToScans = getSnapshotsToScans(conf);
+ Map snapshotsToRestoreDirs = getSnapshotDirs(conf);
+ for (Map.Entry> entry : snapshotsToScans.entrySet()) {
+ String snapshotName = entry.getKey();
+
+
+ Path restoreDir = snapshotsToRestoreDirs.get(snapshotName);
+
+ SnapshotManifest manifest = TableSnapshotInputFormatImpl.getSnapshotManifest(conf, snapshotName, rootDir, fs);
+ List regionInfos = TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest);
+
+ for (Scan scan : entry.getValue()) {
+ List splits = TableSnapshotInputFormatImpl.getSplits(
+ scan,
+ manifest,
+ regionInfos,
+ restoreDir,
+ conf);
+ for (TableSnapshotInputFormatImpl.InputSplit split : splits) {
+ rtn.add(split);
+ }
+ }
+ }
+ return rtn;
+ }
+
+ /**
+ * Retrieve the snapshot name -> list mapping pushed to configuration by {@link #setSnapshotToScans(org.apache.hadoop.conf.Configuration, java.util.Map)}
+ * @param conf
+ * @return
+ * @throws IOException
+ */
+ public Map> getSnapshotsToScans(Configuration conf) throws IOException {
+
+ Map> rtn = Maps.newHashMap();
+
+ for (Map.Entry entry : getKeyValues(conf, SNAPSHOT_TO_SCANS_KEY)) {
+ String snapshotName = entry.getKey();
+ String scan = entry.getValue();
+
+ Collection snapshotScans = rtn.get(snapshotName);
+ if (snapshotScans == null) {
+ snapshotScans = Lists.newArrayList();
+ rtn.put(snapshotName, snapshotScans);
+ }
+
+ snapshotScans.add(TableMapReduceUtil.convertStringToScan(scan));
+ }
+
+ return rtn;
+ }
+
+ /**
+ * Push snapshotScans to conf (under the key {@link #SNAPSHOT_TO_SCANS_KEY})
+ * @param conf
+ * @param snapshotScans
+ * @throws IOException
+ */
+ public void setSnapshotToScans(Configuration conf, Map> snapshotScans) throws IOException {
+ // flatten out snapshotScans for serialization to the job conf
+ List> snapshotToSerializedScans = Lists.newArrayList();
+
+ for (Map.Entry> entry : snapshotScans.entrySet()) {
+ String snapshotName = entry.getKey();
+ Collection scans = entry.getValue();
+
+ // serialize all scans and map them to the appropriate snapshot
+ for (Scan scan : scans) {
+ snapshotToSerializedScans.add(new AbstractMap.SimpleImmutableEntry<>(
+ snapshotName, TableMapReduceUtil.convertScanToString(scan)));
+ }
+ }
+
+ setKeyValues(conf, SNAPSHOT_TO_SCANS_KEY, snapshotToSerializedScans);
+ }
+
+ /**
+ * Retrieve the directories into which snapshots have been restored from ({@link #RESTORE_DIRS_KEY})
+ * @param conf
+ * @return
+ * @throws IOException
+ */
+ public Map getSnapshotDirs(Configuration conf) throws IOException {
+ List> kvps = getKeyValues(conf, RESTORE_DIRS_KEY);
+ Map rtn = Maps.newHashMapWithExpectedSize(kvps.size());
+
+ for (Map.Entry kvp : kvps) {
+ rtn.put(kvp.getKey(), new Path(kvp.getValue()));
+ }
+
+ return rtn;
+ }
+
+ public void setSnapshotDirs(Configuration conf, Map snapshotDirs) {
+ Map toSet = Maps.newHashMap();
+
+ for (Map.Entry entry : snapshotDirs.entrySet()) {
+ toSet.put(entry.getKey(), entry.getValue().toString());
+ }
+
+ setKeyValues(conf, RESTORE_DIRS_KEY, toSet.entrySet());
+ }
+
+ /**
+ * Generate a random path underneath baseRestoreDir for each snapshot in snapshots and return a map from the snapshot
+ * to the restore directory.
+ * @param snapshots
+ * @param baseRestoreDir
+ * @return
+ */
+ private Map generateSnapshotToRestoreDir(Collection snapshots, Path baseRestoreDir) {
+ Map rtn = Maps.newHashMap();
+
+ for (String snapshotName : snapshots) {
+ Path restoreSnapshotDir = new Path(baseRestoreDir, snapshotName + "__" + UUID.randomUUID().toString());
+ rtn.put(snapshotName, restoreSnapshotDir);
+ }
+
+ return rtn;
+ }
+
+ /**
+ * Restore each (snapshot name, restore directory) pair in snapshotToDir
+ * @param conf
+ * @param snapshotToDir
+ * @param fs
+ * @throws IOException
+ */
+ public void restoreSnapshots(Configuration conf, Map snapshotToDir, FileSystem fs) throws IOException {
+ // TODO: restore from record readers to parallelize.
+ Path rootDir = FSUtils.getRootDir(conf);
+
+ for (Map.Entry entry : snapshotToDir.entrySet()) {
+ String snapshotName = entry.getKey();
+ Path restoreDir = entry.getValue();
+ LOG.info("Restoring snapshot " + snapshotName + " into " + restoreDir + " for MultiTableSnapshotInputFormat");
+ restoreSnapshot(conf, snapshotName, rootDir, restoreDir, fs);
+ }
+ }
+
+ void restoreSnapshot(Configuration conf, String snapshotName, Path rootDir, Path restoreDir, FileSystem fs) throws IOException {
+ RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
+ }
+
+
+ // TODO: these probably belong elsewhere/may already be implemented elsewhere.
+
+ /**
+ * Store a collection of Map.Entry's in conf, with each entry separated by ',' and key values delimited by ':'
+ * @param conf
+ * @param key
+ * @param keyValues
+ */
+ private static void setKeyValues(Configuration conf, String key, Collection> keyValues) {
+ List serializedKvps = Lists.newArrayList();
+
+ for (Map.Entry kvp : keyValues) {
+ serializedKvps.add(kvp.getKey() + KVP_DELIMITER + kvp.getValue());
+ }
+
+ conf.setStrings(key, serializedKvps.toArray(new String[serializedKvps.size()]));
+ }
+
+ private static List> getKeyValues(Configuration conf, String key) {
+ String[] kvps = conf.getStrings(key);
+
+ List> rtn = Lists.newArrayList();
+
+ for (String kvp : kvps) {
+ String[] splitKvp = StringUtils.split(kvp, KVP_DELIMITER);
+
+ if (splitKvp.length != 2) {
+ throw new IllegalArgumentException("Expected key value pair for configuration key '" + key + "'"
+ + " to be of form '" + KVP_DELIMITER + "; was " + kvp + " instead");
+ }
+
+ rtn.add(new AbstractMap.SimpleImmutableEntry<>(splitKvp[0], splitKvp[1]));
+ }
+ return rtn;
+ }
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
index 149752b..e358453 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
@@ -22,13 +22,7 @@ import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
@@ -307,6 +301,46 @@ public class TableMapReduceUtil {
}
/**
+ * Sets up the job for reading from one or more multiple table snapshots, with one or more scan per snapshot.
+ * It bypasses hbase servers and read directly from snapshot files.
+ *
+ * @param snapshotScans map of snapshot name to scans on that snapshot.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
+ * @param addDependencyJars upload HBase jars and jars for any of the configured
+ * job classes via the distributed cache (tmpjars).
+ */
+ public static void initMultiTableSnapshotMapperJob(Map> snapshotScans,
+ Class extends TableMapper> mapper,
+ Class> outputKeyClass,
+ Class> outputValueClass, Job job,
+ boolean addDependencyJars, Path tmpRestoreDir
+ ) throws IOException {
+ MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir);
+
+ job.setInputFormatClass(MultiTableSnapshotInputFormat.class);
+ if (outputValueClass != null) {
+ job.setMapOutputValueClass(outputValueClass);
+ }
+ if (outputKeyClass != null) {
+ job.setMapOutputKeyClass(outputKeyClass);
+ }
+ job.setMapperClass(mapper);
+ Configuration conf = job.getConfiguration();
+ HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+
+ if (addDependencyJars) {
+ addDependencyJars(job);
+ addDependencyJars(job.getConfiguration(), MetricsRegistry.class);
+ }
+
+ resetCacheConfig(job.getConfiguration());
+ }
+
+ /**
* Sets up the job for reading from a table snapshot. It bypasses hbase servers
* and read directly from snapshot files.
*
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
index 44d88c5..de0f69b 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Writable;
@@ -98,8 +99,8 @@ public class TableSnapshotInputFormat extends InputFormat locations) {
- this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations);
+ List locations, Scan scan, Path restoreDir) {
+ this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
}
@Override
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
index 8496868..a6b3306 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.hbase.mapreduce;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -35,11 +38,13 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit;
+import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos;
import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.Writable;
@@ -61,9 +66,12 @@ public class TableSnapshotInputFormatImpl {
// TODO: Snapshots files are owned in fs by the hbase user. There is no
// easy way to delegate access.
+ public static final Log LOG = LogFactory.getLog(TableSnapshotInputFormatImpl.class);
+
+
private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
// key for specifying the root dir of the restored snapshot
- private static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
+ protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
/** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
private static final String LOCALITY_CUTOFF_MULTIPLIER =
@@ -74,14 +82,17 @@ public class TableSnapshotInputFormatImpl {
* Implementation class for InputSplit logic common between mapred and mapreduce.
*/
public static class InputSplit implements Writable {
+
private HTableDescriptor htd;
private HRegionInfo regionInfo;
private String[] locations;
+ private String scan;
+ private String restoreDir;
// constructor for mapreduce framework / Writable
public InputSplit() {}
- public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List locations) {
+ public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List locations, Scan scan, Path restoreDir) {
this.htd = htd;
this.regionInfo = regionInfo;
if (locations == null || locations.isEmpty()) {
@@ -89,6 +100,25 @@ public class TableSnapshotInputFormatImpl {
} else {
this.locations = locations.toArray(new String[locations.size()]);
}
+ try {
+ this.scan = scan != null ? TableMapReduceUtil.convertScanToString(scan) : "";
+ } catch (IOException e) {
+ LOG.warn("Failed to convert Scan to String", e);
+ }
+
+ this.restoreDir = restoreDir.toString();
+ }
+
+ public HTableDescriptor getHtd() {
+ return htd;
+ }
+
+ public String getScan() {
+ return scan;
+ }
+
+ public String getRestoreDir() {
+ return restoreDir;
}
public long getLength() {
@@ -128,6 +158,10 @@ public class TableSnapshotInputFormatImpl {
byte[] buf = baos.toByteArray();
out.writeInt(buf.length);
out.write(buf);
+
+ Bytes.writeByteArray(out, Bytes.toBytes(scan));
+ Bytes.writeByteArray(out, Bytes.toBytes(restoreDir));
+
}
@Override
@@ -140,6 +174,9 @@ public class TableSnapshotInputFormatImpl {
this.regionInfo = HRegionInfo.convert(split.getRegion());
List locationsList = split.getLocationsList();
this.locations = locationsList.toArray(new String[locationsList.size()]);
+
+ this.scan = Bytes.toString(Bytes.readByteArray(in));
+ this.restoreDir = Bytes.toString(Bytes.readByteArray(in));
}
}
@@ -158,28 +195,12 @@ public class TableSnapshotInputFormatImpl {
}
public void initialize(InputSplit split, Configuration conf) throws IOException {
+ this.scan = TableMapReduceUtil.convertStringToScan(split.getScan());
this.split = split;
HTableDescriptor htd = split.htd;
HRegionInfo hri = this.split.getRegionInfo();
FileSystem fs = FSUtils.getCurrentFileSystem(conf);
- Path tmpRootDir = new Path(conf.get(RESTORE_DIR_KEY)); // This is the user specified root
- // directory where snapshot was restored
-
- // create scan
- // TODO: mapred does not support scan as input API. Work around for now.
- if (conf.get(TableInputFormat.SCAN) != null) {
- scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
- } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
- String[] columns =
- conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
- scan = new Scan();
- for (String col : columns) {
- scan.addFamily(Bytes.toBytes(col));
- }
- } else {
- throw new IllegalArgumentException("A Scan is not configured for this job");
- }
// region is immutable, this should be fine,
// otherwise we have to set the thread read point
@@ -187,7 +208,7 @@ public class TableSnapshotInputFormatImpl {
// disable caching of data blocks
scan.setCacheBlocks(false);
- scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null);
+ scanner = new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null);
}
public boolean nextKeyValue() throws IOException {
@@ -233,18 +254,39 @@ public class TableSnapshotInputFormatImpl {
Path rootDir = FSUtils.getRootDir(conf);
FileSystem fs = rootDir.getFileSystem(conf);
- Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
- SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
- SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
+ SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs);
+
+ List regionInfos = getRegionInfosFromManifest(manifest);
+
+ // TODO: mapred does not support scan as input API. Work around for now.
+ Scan scan = extractScanFromConf(conf);
+ // the temp dir where the snapshot is restored
+ Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
+
+ return getSplits(scan, manifest, regionInfos, restoreDir, conf);
+ }
+
+ public static List getRegionInfosFromManifest(SnapshotManifest manifest) {
List regionManifests = manifest.getRegionManifests();
if (regionManifests == null) {
throw new IllegalArgumentException("Snapshot seems empty");
}
- // load table descriptor
- HTableDescriptor htd = manifest.getTableDescriptor();
+ List regionInfos = Lists.newArrayListWithCapacity(regionManifests.size());
- // TODO: mapred does not support scan as input API. Work around for now.
+ for (SnapshotRegionManifest regionManifest : regionManifests) {
+ regionInfos.add(HRegionInfo.convert(regionManifest.getRegionInfo()));
+ }
+ return regionInfos;
+ }
+
+ public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName, Path rootDir, FileSystem fs) throws IOException {
+ Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
+ SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
+ return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
+ }
+
+ public static Scan extractScanFromConf(Configuration conf) throws IOException {
Scan scan = null;
if (conf.get(TableInputFormat.SCAN) != null) {
scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
@@ -258,29 +300,39 @@ public class TableSnapshotInputFormatImpl {
} else {
throw new IllegalArgumentException("Unable to create scan");
}
- // the temp dir where the snapshot is restored
- Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
+ return scan;
+ }
+
+ public static List getSplits(Scan scan,
+ SnapshotManifest manifest,
+ List regionManifests,
+ Path restoreDir,
+ Configuration conf
+ ) throws IOException {
+ // load table descriptor
+ HTableDescriptor htd = manifest.getTableDescriptor();
+
Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName());
List splits = new ArrayList();
- for (SnapshotRegionManifest regionManifest : regionManifests) {
+ for (HRegionInfo hri : regionManifests) {
// load region descriptor
- HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo());
if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
- hri.getStartKey(), hri.getEndKey())) {
+ hri.getStartKey(), hri.getEndKey())) {
// compute HDFS locations from snapshot files (which will get the locations for
// referred hfiles)
List hosts = getBestLocations(conf,
- HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
+ HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
int len = Math.min(3, hosts.size());
hosts = hosts.subList(0, len);
- splits.add(new InputSplit(htd, hri, hosts));
+ splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
}
}
return splits;
+
}
/**
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java
new file mode 100644
index 0000000..f526509
--- /dev/null
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java
@@ -0,0 +1,107 @@
+package org.apache.hadoop.hbase.mapred;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.*;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+@Category({VerySlowMapReduceTests.class, LargeTests.class})
+public class TestMultiTableSnapshotInputFormat extends org.apache.hadoop.hbase.mapreduce.TestMultiTableSnapshotInputFormat {
+
+ private static final Log LOG = LogFactory.getLog(TestMultiTableSnapshotInputFormat.class);
+
+ @Override
+ protected void runJob(String jobName, Configuration c, List scans) throws IOException, InterruptedException, ClassNotFoundException {
+ JobConf job = new JobConf(TEST_UTIL.getConfiguration());
+
+ job.setJobName(jobName);
+ job.setMapperClass(Mapper.class);
+ job.setReducerClass(Reducer.class);
+
+ TableMapReduceUtil.initMultiTableSnapshotMapperJob(
+ getSnapshotScanMapping(scans), Mapper.class,
+ ImmutableBytesWritable.class, ImmutableBytesWritable.class, job,
+ true, restoreDir
+ );
+
+ TableMapReduceUtil.addDependencyJars(job);
+
+ job.setReducerClass(Reducer.class);
+ job.setNumReduceTasks(1); // one to get final "first" and "last" key
+ FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
+ LOG.info("Started " + job.getJobName());
+
+ RunningJob runningJob = JobClient.runJob(job);
+ runningJob.waitForCompletion();
+ assertTrue(runningJob.isSuccessful());
+ LOG.info("After map/reduce completion - job " + jobName);
+ }
+
+ public static class Mapper extends TestMultiTableSnapshotInputFormat.ScanMapper implements TableMap {
+
+ @Override
+ public void map(ImmutableBytesWritable key, Result value, OutputCollector outputCollector, Reporter reporter) throws IOException {
+ makeAssertions(key, value);
+ outputCollector.collect(key, key);
+ }
+
+ /**
+ * Closes this stream and releases any system resources associated
+ * with it. If the stream is already closed then invoking this
+ * method has no effect.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void configure(JobConf jobConf) {
+
+ }
+ }
+
+ public static class Reducer extends TestMultiTableSnapshotInputFormat.ScanReducer implements org.apache.hadoop.mapred.Reducer {
+
+ private JobConf jobConf;
+
+ @Override
+ public void reduce(ImmutableBytesWritable key, Iterator values, OutputCollector outputCollector, Reporter reporter) throws IOException {
+ makeAssertions(key, Lists.newArrayList(values));
+ }
+
+ /**
+ * Closes this stream and releases any system resources associated
+ * with it. If the stream is already closed then invoking this
+ * method has no effect.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ public void close() throws IOException {
+ super.cleanup(this.jobConf);
+ }
+
+ @Override
+ public void configure(JobConf jobConf) {
+ this.jobConf = jobConf;
+ }
+ }
+}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java
new file mode 100644
index 0000000..54406b6
--- /dev/null
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java
@@ -0,0 +1,252 @@
+package org.apache.hadoop.hbase.mapreduce;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Base set of tests and setup for input formats touching multiple tables.
+ */
+public abstract class MultiTableInputFormatTestBase {
+ static final Log LOG = LogFactory.getLog(TestMultiTableInputFormat.class);
+ public static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ static final String TABLE_NAME = "scantest";
+ static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
+ static final String KEY_STARTROW = "startRow";
+ static final String KEY_LASTROW = "stpRow";
+
+ static List TABLES = Lists.newArrayList();
+
+ static {
+ for (int i = 0; i < 3; i++) {
+ TABLES.add(TABLE_NAME + String.valueOf(i));
+ }
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // switch TIF to log at DEBUG level
+ TEST_UTIL.enableDebug(MultiTableInputFormatBase.class);
+ // start mini hbase cluster
+ TEST_UTIL.startMiniCluster(3);
+ // create and fill table
+ for (String tableName : TABLES) {
+ try (HTable table =
+ TEST_UTIL.createMultiRegionTable(TableName.valueOf(tableName),
+ INPUT_FAMILY, 4)) {
+ TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
+ }
+ }
+ // start MR cluster
+ TEST_UTIL.startMiniMapReduceCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniMapReduceCluster();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ Configuration c = TEST_UTIL.getConfiguration();
+ FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir")));
+ }
+
+ @Test
+ public void testScanEmptyToEmpty() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ testScan(null, null, null);
+ }
+
+ @Test
+ public void testScanEmptyToAPP() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ testScan(null, "app", "apo");
+ }
+
+ @Test
+ public void testScanOBBToOPP() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ testScan("obb", "opp", "opo");
+ }
+
+ @Test
+ public void testScanYZYToEmpty() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ testScan("yzy", null, "zzz");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws java.io.IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ private void testScan(String start, String stop, String last)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ String jobName =
+ "Scan" + (start != null ? start.toUpperCase() : "Empty") + "To" +
+ (stop != null ? stop.toUpperCase() : "Empty");
+ LOG.info("Before map/reduce startup - job " + jobName);
+ Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+
+ c.set(KEY_STARTROW, start != null ? start : "");
+ c.set(KEY_LASTROW, last != null ? last : "");
+
+ List scans = new ArrayList();
+
+ for(String tableName : TABLES){
+ Scan scan = new Scan();
+
+ scan.addFamily(INPUT_FAMILY);
+ scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName));
+
+ if (start != null) {
+ scan.setStartRow(Bytes.toBytes(start));
+ }
+ if (stop != null) {
+ scan.setStopRow(Bytes.toBytes(stop));
+ }
+
+ scans.add(scan);
+
+ LOG.info("scan before: " + scan);
+ }
+
+ runJob(jobName, c, scans);
+ }
+
+ protected void runJob(String jobName, Configuration c, List scans) throws IOException, InterruptedException, ClassNotFoundException {
+ Job job = new Job(c, jobName);
+
+ initJob(scans, job);
+ job.setReducerClass(ScanReducer.class);
+ job.setNumReduceTasks(1); // one to get final "first" and "last" key
+ FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
+ LOG.info("Started " + job.getJobName());
+ job.waitForCompletion(true);
+ assertTrue(job.isSuccessful());
+ LOG.info("After map/reduce completion - job " + jobName);
+ }
+
+ protected abstract void initJob(List scans, Job job) throws IOException;
+
+ /**
+ * Pass the key and value to reducer.
+ */
+ public static class ScanMapper extends
+ TableMapper {
+ /**
+ * Pass the key and value to reduce.
+ *
+ * @param key The key, here "aaa", "aab" etc.
+ * @param value The value is the same as the key.
+ * @param context The task context.
+ * @throws java.io.IOException When reading the rows fails.
+ */
+ @Override
+ public void map(ImmutableBytesWritable key, Result value, Context context)
+ throws IOException, InterruptedException {
+ makeAssertions(key, value);
+ context.write(key, key);
+ }
+
+ public void makeAssertions(ImmutableBytesWritable key, Result value) throws IOException {
+ if (value.size() != 1) {
+ throw new IOException("There should only be one input column");
+ }
+ Map>> cf =
+ value.getMap();
+ if (!cf.containsKey(INPUT_FAMILY)) {
+ throw new IOException("Wrong input columns. Missing: '" +
+ Bytes.toString(INPUT_FAMILY) + "'.");
+ }
+ String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
+ LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) +
+ ", value -> " + val);
+ }
+ }
+
+ /**
+ * Checks the last and first keys seen against the scanner boundaries.
+ */
+ public static class ScanReducer
+ extends
+ Reducer {
+ private String first = null;
+ private String last = null;
+
+ @Override
+ protected void reduce(ImmutableBytesWritable key,
+ Iterable values, Context context)
+ throws IOException, InterruptedException {
+ makeAssertions(key, values);
+ }
+
+ protected void makeAssertions(ImmutableBytesWritable key, Iterable values) {
+ int count = 0;
+ for (ImmutableBytesWritable value : values) {
+ String val = Bytes.toStringBinary(value.get());
+ LOG.debug("reduce: key[" + count + "] -> " +
+ Bytes.toStringBinary(key.get()) + ", value -> " + val);
+ if (first == null) first = val;
+ last = val;
+ count++;
+ }
+ assertEquals(3, count);
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException,
+ InterruptedException {
+ Configuration c = context.getConfiguration();
+ cleanup(c);
+ }
+
+ protected void cleanup(Configuration c) {
+ String startRow = c.get(KEY_STARTROW);
+ String lastRow = c.get(KEY_LASTROW);
+ LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" +
+ startRow + "\"");
+ LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow +
+ "\"");
+ if (startRow != null && startRow.length() > 0) {
+ assertEquals(startRow, first);
+ }
+ if (lastRow != null && lastRow.length() > 0) {
+ assertEquals(lastRow, last);
+ }
+ }
+ }
+}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java
index 3226cc6..d0c9d99 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java
@@ -57,196 +57,16 @@ import org.junit.experimental.categories.Category;
* too.
*/
@Category({VerySlowMapReduceTests.class, LargeTests.class})
-public class TestMultiTableInputFormat {
-
- static final Log LOG = LogFactory.getLog(TestMultiTableInputFormat.class);
- static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
- static final String TABLE_NAME = "scantest";
- static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
- static final String KEY_STARTROW = "startRow";
- static final String KEY_LASTROW = "stpRow";
+public class TestMultiTableInputFormat extends MultiTableInputFormatTestBase {
@BeforeClass
- public static void setUpBeforeClass() throws Exception {
- // switch TIF to log at DEBUG level
+ public static void setupLogging() {
TEST_UTIL.enableDebug(MultiTableInputFormat.class);
- TEST_UTIL.enableDebug(MultiTableInputFormatBase.class);
- // start mini hbase cluster
- TEST_UTIL.startMiniCluster(3);
- // create and fill table
- for (int i = 0; i < 3; i++) {
- try (HTable table =
- TEST_UTIL.createMultiRegionTable(TableName.valueOf(TABLE_NAME + String.valueOf(i)),
- INPUT_FAMILY, 4)) {
- TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
- }
- }
- // start MR cluster
- TEST_UTIL.startMiniMapReduceCluster();
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- TEST_UTIL.shutdownMiniMapReduceCluster();
- TEST_UTIL.shutdownMiniCluster();
- }
-
- @After
- public void tearDown() throws Exception {
- Configuration c = TEST_UTIL.getConfiguration();
- FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir")));
}
- /**
- * Pass the key and value to reducer.
- */
- public static class ScanMapper extends
- TableMapper {
- /**
- * Pass the key and value to reduce.
- *
- * @param key The key, here "aaa", "aab" etc.
- * @param value The value is the same as the key.
- * @param context The task context.
- * @throws IOException When reading the rows fails.
- */
- @Override
- public void map(ImmutableBytesWritable key, Result value, Context context)
- throws IOException, InterruptedException {
- if (value.size() != 1) {
- throw new IOException("There should only be one input column");
- }
- Map>> cf =
- value.getMap();
- if (!cf.containsKey(INPUT_FAMILY)) {
- throw new IOException("Wrong input columns. Missing: '" +
- Bytes.toString(INPUT_FAMILY) + "'.");
- }
- String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
- LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) +
- ", value -> " + val);
- context.write(key, key);
- }
- }
-
- /**
- * Checks the last and first keys seen against the scanner boundaries.
- */
- public static class ScanReducer
- extends
- Reducer {
- private String first = null;
- private String last = null;
-
- @Override
- protected void reduce(ImmutableBytesWritable key,
- Iterable values, Context context)
- throws IOException, InterruptedException {
- int count = 0;
- for (ImmutableBytesWritable value : values) {
- String val = Bytes.toStringBinary(value.get());
- LOG.debug("reduce: key[" + count + "] -> " +
- Bytes.toStringBinary(key.get()) + ", value -> " + val);
- if (first == null) first = val;
- last = val;
- count++;
- }
- assertEquals(3, count);
- }
-
- @Override
- protected void cleanup(Context context) throws IOException,
- InterruptedException {
- Configuration c = context.getConfiguration();
- String startRow = c.get(KEY_STARTROW);
- String lastRow = c.get(KEY_LASTROW);
- LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" +
- startRow + "\"");
- LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow +
- "\"");
- if (startRow != null && startRow.length() > 0) {
- assertEquals(startRow, first);
- }
- if (lastRow != null && lastRow.length() > 0) {
- assertEquals(lastRow, last);
- }
- }
- }
-
- @Test
- public void testScanEmptyToEmpty() throws IOException, InterruptedException,
- ClassNotFoundException {
- testScan(null, null, null);
- }
-
- @Test
- public void testScanEmptyToAPP() throws IOException, InterruptedException,
- ClassNotFoundException {
- testScan(null, "app", "apo");
- }
-
- @Test
- public void testScanOBBToOPP() throws IOException, InterruptedException,
- ClassNotFoundException {
- testScan("obb", "opp", "opo");
- }
-
- @Test
- public void testScanYZYToEmpty() throws IOException, InterruptedException,
- ClassNotFoundException {
- testScan("yzy", null, "zzz");
- }
-
- /**
- * Tests a MR scan using specific start and stop rows.
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- private void testScan(String start, String stop, String last)
- throws IOException, InterruptedException, ClassNotFoundException {
- String jobName =
- "Scan" + (start != null ? start.toUpperCase() : "Empty") + "To" +
- (stop != null ? stop.toUpperCase() : "Empty");
- LOG.info("Before map/reduce startup - job " + jobName);
- Configuration c = new Configuration(TEST_UTIL.getConfiguration());
-
- c.set(KEY_STARTROW, start != null ? start : "");
- c.set(KEY_LASTROW, last != null ? last : "");
-
- List scans = new ArrayList();
-
- for(int i=0; i<3; i++){
- Scan scan = new Scan();
-
- scan.addFamily(INPUT_FAMILY);
- scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(TABLE_NAME + i));
-
- if (start != null) {
- scan.setStartRow(Bytes.toBytes(start));
- }
- if (stop != null) {
- scan.setStopRow(Bytes.toBytes(stop));
- }
-
- scans.add(scan);
-
- LOG.info("scan before: " + scan);
- }
-
- Job job = new Job(c, jobName);
-
+ @Override
+ protected void initJob(List scans, Job job) throws IOException {
TableMapReduceUtil.initTableMapperJob(scans, ScanMapper.class,
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
- job.setReducerClass(ScanReducer.class);
- job.setNumReduceTasks(1); // one to get final "first" and "last" key
- FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
- LOG.info("Started " + job.getJobName());
- job.waitForCompletion(true);
- assertTrue(job.isSuccessful());
- LOG.info("After map/reduce completion - job " + jobName);
}
}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java
new file mode 100644
index 0000000..924e53a
--- /dev/null
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java
@@ -0,0 +1,79 @@
+package org.apache.hadoop.hbase.mapreduce;
+
+import com.google.common.base.Function;
+import com.google.common.collect.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+@Category({VerySlowMapReduceTests.class, LargeTests.class})
+public class TestMultiTableSnapshotInputFormat extends MultiTableInputFormatTestBase {
+
+ protected Path restoreDir;
+
+ @BeforeClass
+ public static void setUpSnapshots() throws Exception {
+
+ TEST_UTIL.enableDebug(MultiTableSnapshotInputFormat.class);
+ TEST_UTIL.enableDebug(MultiTableSnapshotInputFormatImpl.class);
+
+ // take a snapshot of every table we have.
+ for (String tableName : TABLES) {
+ SnapshotTestingUtils.createSnapshotAndValidate(
+ TEST_UTIL.getHBaseAdmin(), TableName.valueOf(tableName),
+ ImmutableList.of(MultiTableInputFormatTestBase.INPUT_FAMILY), null,
+ snapshotNameForTable(tableName), FSUtils.getRootDir(TEST_UTIL.getConfiguration()), TEST_UTIL.getTestFileSystem(), true);
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ this.restoreDir = new Path("/tmp");
+
+ }
+
+ @Override
+ protected void initJob(List scans, Job job) throws IOException {
+ TableMapReduceUtil.initMultiTableSnapshotMapperJob(
+ getSnapshotScanMapping(scans), ScanMapper.class,
+ ImmutableBytesWritable.class, ImmutableBytesWritable.class, job,
+ true, restoreDir
+ );
+ }
+
+ protected Map> getSnapshotScanMapping(final List scans) {
+ return Multimaps.index(scans, new Function() {
+ @Nullable
+ @Override
+ public String apply(Scan input) {
+ return snapshotNameForTable(Bytes.toStringBinary(input.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME)));
+ }
+ }).asMap();
+ }
+
+ public static String snapshotNameForTable(String tableName) {
+ return tableName + "_snapshot";
+ }
+
+}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java
new file mode 100644
index 0000000..d7867bb
--- /dev/null
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java
@@ -0,0 +1,157 @@
+package org.apache.hadoop.hbase.mapreduce;
+
+import com.google.common.collect.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.Objects;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verify;
+
+@Category({SmallTests.class})
+public class TestMultiTableSnapshotInputFormatImpl {
+
+ private MultiTableSnapshotInputFormatImpl subject;
+ private Map> snapshotScans;
+ private Path restoreDir;
+ private Configuration conf;
+ private Path rootDir;
+
+ @Before
+ public void setUp() throws Exception {
+ this.subject = Mockito.spy(new MultiTableSnapshotInputFormatImpl());
+
+ // mock out restoreSnapshot
+ // TODO: this is kind of meh; it'd be much nicer to just inject the RestoreSnapshotHelper dependency into the
+ // input format. However, we need a new RestoreSnapshotHelper per snapshot in the current design, and it *also*
+ // feels weird to introduce a RestoreSnapshotHelperFactory and inject that, which would probably be the more "pure"
+ // way of doing things. This is the lesser of two evils, perhaps?
+ doNothing().when(this.subject).
+ restoreSnapshot(any(Configuration.class), any(String.class), any(Path.class), any(Path.class), any(FileSystem.class)) ;
+
+ this.conf = new Configuration();
+ this.rootDir = new Path("file:///test-root-dir");
+ FSUtils.setRootDir(conf, rootDir);
+ this.snapshotScans = ImmutableMap.>of(
+ "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2"))),
+ "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("3"), Bytes.toBytes("4")),
+ new Scan(Bytes.toBytes("5"), Bytes.toBytes("6")))
+ );
+
+ this.restoreDir = new Path(FSUtils.getRootDir(conf), "restore-dir");
+
+ }
+
+ public void callSetInput() throws IOException {
+ subject.setInput(this.conf, snapshotScans, restoreDir);
+ }
+
+ public Map> toScanWithEquals(Map> snapshotScans) throws IOException {
+ Map> rtn = Maps.newHashMap();
+
+ for (Map.Entry> entry : snapshotScans.entrySet()) {
+ List scans = Lists.newArrayList();
+
+ for (Scan scan : entry.getValue()) {
+ scans.add(new ScanWithEquals(scan));
+ }
+ rtn.put(entry.getKey(), scans);
+ }
+
+ return rtn;
+ }
+
+ public static class ScanWithEquals {
+
+ private final String startRow;
+ private final String stopRow;
+
+ /**
+ * Creates a new instance of this class while copying all values.
+ *
+ * @param scan The scan instance to copy from.
+ * @throws java.io.IOException When copying the values fails.
+ */
+ public ScanWithEquals(Scan scan) throws IOException {
+ this.startRow = Bytes.toStringBinary(scan.getStartRow());
+ this.stopRow = Bytes.toStringBinary(scan.getStopRow());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (! (obj instanceof ScanWithEquals)) {
+ return false;
+ }
+ ScanWithEquals otherScan = (ScanWithEquals) obj;
+ return Objects.equals(this.startRow, otherScan.startRow) && Objects.equals(this.stopRow, otherScan.stopRow);
+ }
+
+ @Override
+ public String toString() {
+ return com.google.common.base.Objects.toStringHelper(this)
+ .add("startRow", startRow)
+ .add("stopRow", stopRow)
+ .toString();
+ }
+ }
+
+ @Test
+ public void testSetInputSetsSnapshotToScans() throws Exception {
+
+ callSetInput();
+
+ Map> actual = subject.getSnapshotsToScans(conf);
+
+ // convert to scans we can use .equals on
+ Map> actualWithEquals = toScanWithEquals(actual);
+ Map> expectedWithEquals = toScanWithEquals(snapshotScans);
+
+ assertEquals(expectedWithEquals, actualWithEquals);
+ }
+
+ @Test
+ public void testSetInputPushesRestoreDirectories() throws Exception {
+ callSetInput();
+
+ Map restoreDirs = subject.getSnapshotDirs(conf);
+
+ assertEquals(this.snapshotScans.keySet(), restoreDirs.keySet());
+ }
+
+ @Test
+ public void testSetInputCreatesRestoreDirectoriesUnderRootRestoreDir() throws Exception {
+ callSetInput();
+
+ Map restoreDirs = subject.getSnapshotDirs(conf);
+
+ for (Path snapshotDir : restoreDirs.values()) {
+ assertEquals("Expected " + snapshotDir + " to be a child of " + restoreDir, restoreDir, snapshotDir.getParent());
+ }
+ }
+
+ @Test
+ public void testSetInputRestoresSnapshots() throws Exception {
+ callSetInput();
+
+ Map snapshotDirs = subject.getSnapshotDirs(conf);
+
+ for (Map.Entry entry : snapshotDirs.entrySet()) {
+ verify(this.subject).restoreSnapshot(eq(this.conf), eq(entry.getKey()), eq(this.rootDir), eq(entry.getValue()), any(FileSystem.class));
+ }
+ }
+}