Index: hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java (revision 1590241) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java (working copy) @@ -18,8 +18,10 @@ package org.apache.hadoop.hbase.snapshot; +import java.io.BufferedInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.net.URI; import java.util.ArrayList; import java.util.Collections; @@ -62,6 +64,7 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.security.TokenCache; +import org.apache.hadoop.tools.util.ThrottledInputStream; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -86,6 +89,7 @@ private static final String CONF_INPUT_ROOT = "snapshot.export.input.root"; private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size"; private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group"; + private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb"; static final String CONF_TEST_FAILURE = "test.snapshot.export.failure"; static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry"; @@ -225,7 +229,11 @@ } } - FSDataInputStream in = openSourceFile(context, inputPath); + InputStream in = openSourceFile(context, inputPath); + int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100); + if (Integer.MAX_VALUE != bandwidthMB) { + in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024); + } try { context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen()); @@ -298,7 +306,7 @@ } private void copyData(final Context context, - final Path inputPath, final FSDataInputStream in, + final Path inputPath, final InputStream in, final Path outputPath, final FSDataOutputStream out, final long inputFileSize) throws IOException { @@ -585,7 +593,8 @@ private void runCopyJob(final Path inputRoot, final Path outputRoot, final List> snapshotFiles, final boolean verifyChecksum, final String filesUser, final String filesGroup, final int filesMode, - final int mappers) throws IOException, InterruptedException, ClassNotFoundException { + final int mappers, final int bandwidthMB) + throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = getConf(); if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup); if (filesUser != null) conf.set(CONF_FILES_USER, filesUser); @@ -594,6 +603,7 @@ conf.set(CONF_OUTPUT_ROOT, outputRoot.toString()); conf.set(CONF_INPUT_ROOT, inputRoot.toString()); conf.setInt("mapreduce.job.maps", mappers); + conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB); Job job = new Job(conf); job.setJobName("ExportSnapshot"); @@ -655,6 +665,7 @@ String filesGroup = null; String filesUser = null; Path outputRoot = null; + int bandwidthMB = Integer.MAX_VALUE; int filesMode = 0; int mappers = 0; @@ -681,6 +692,8 @@ filesUser = args[++i]; } else if (cmd.equals("-chgroup")) { filesGroup = args[++i]; + } else if (cmd.equals("-bandwidth")) { + bandwidthMB = Integer.parseInt(args[++i]); } else if (cmd.equals("-chmod")) { filesMode = Integer.parseInt(args[++i], 8); } else if (cmd.equals("-overwrite")) { @@ -773,7 +786,7 @@ LOG.warn("There are 0 store file to be copied. There may be no data in the table."); } else { runCopyJob(inputRoot, outputRoot, files, verifyChecksum, - filesUser, filesGroup, filesMode, mappers); + filesUser, filesGroup, filesMode, mappers, bandwidthMB); } // Step 3 - Rename fs2:/.snapshot/.tmp/ fs2:/.snapshot/ Index: hbase-server/pom.xml =================================================================== --- hbase-server/pom.xml (revision 1590241) +++ hbase-server/pom.xml (working copy) @@ -308,6 +308,11 @@ commons-collections + org.apache.hadoop + hadoop-distcp + ${hadoop-two.version} + + org.apache.hbase hbase-hadoop-compat