Index: hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java (revision 1590805) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java (working copy) @@ -18,8 +18,10 @@ package org.apache.hadoop.hbase.snapshot; +import java.io.BufferedInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.net.URI; import java.util.ArrayList; import java.util.Collections; @@ -62,6 +64,7 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.security.TokenCache; +import org.apache.hadoop.hbase.util.hadoopbackport.ThrottledInputStream; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -86,6 +89,7 @@ private static final String CONF_INPUT_ROOT = "snapshot.export.input.root"; private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size"; private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group"; + private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb"; static final String CONF_TEST_FAILURE = "test.snapshot.export.failure"; static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry"; @@ -225,7 +229,11 @@ } } - FSDataInputStream in = openSourceFile(context, inputPath); + InputStream in = openSourceFile(context, inputPath); + int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100); + if (Integer.MAX_VALUE != bandwidthMB) { + in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024); + } try { context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen()); @@ -298,7 +306,7 @@ } private void copyData(final Context context, - final Path inputPath, final FSDataInputStream in, + final Path inputPath, final InputStream in, final Path outputPath, final FSDataOutputStream out, final long inputFileSize) throws IOException { @@ -585,7 +593,8 @@ private void runCopyJob(final Path inputRoot, final Path outputRoot, final List> snapshotFiles, final boolean verifyChecksum, final String filesUser, final String filesGroup, final int filesMode, - final int mappers) throws IOException, InterruptedException, ClassNotFoundException { + final int mappers, final int bandwidthMB) + throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = getConf(); if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup); if (filesUser != null) conf.set(CONF_FILES_USER, filesUser); @@ -594,6 +603,7 @@ conf.set(CONF_OUTPUT_ROOT, outputRoot.toString()); conf.set(CONF_INPUT_ROOT, inputRoot.toString()); conf.setInt("mapreduce.job.maps", mappers); + conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB); Job job = new Job(conf); job.setJobName("ExportSnapshot"); @@ -655,6 +665,7 @@ String filesGroup = null; String filesUser = null; Path outputRoot = null; + int bandwidthMB = Integer.MAX_VALUE; int filesMode = 0; int mappers = 0; @@ -681,6 +692,8 @@ filesUser = args[++i]; } else if (cmd.equals("-chgroup")) { filesGroup = args[++i]; + } else if (cmd.equals("-bandwidth")) { + bandwidthMB = Integer.parseInt(args[++i]); } else if (cmd.equals("-chmod")) { filesMode = Integer.parseInt(args[++i], 8); } else if (cmd.equals("-overwrite")) { @@ -773,7 +786,7 @@ LOG.warn("There are 0 store file to be copied. There may be no data in the table."); } else { runCopyJob(inputRoot, outputRoot, files, verifyChecksum, - filesUser, filesGroup, filesMode, mappers); + filesUser, filesGroup, filesMode, mappers, bandwidthMB); } // Step 3 - Rename fs2:/.snapshot/.tmp/ fs2:/.snapshot/ Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/hadoopbackport/ThrottledInputStream.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/hadoopbackport/ThrottledInputStream.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/hadoopbackport/ThrottledInputStream.java (revision 0) @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util.hadoopbackport; + +import java.io.IOException; +import java.io.InputStream; + +/** + * The ThrottleInputStream provides bandwidth throttling on a specified + * InputStream. It is implemented as a wrapper on top of another InputStream + * instance. + * The throttling works by examining the number of bytes read from the underlying + * InputStream from the beginning, and sleep()ing for a time interval if + * the byte-transfer is found exceed the specified tolerable maximum. + * (Thus, while the read-rate might exceed the maximum for a given short interval, + * the average tends towards the specified maximum, overall.) + */ +public class ThrottledInputStream extends InputStream { + + private final InputStream rawStream; + private final long maxBytesPerSec; + private final long startTime = System.currentTimeMillis(); + + private long bytesRead = 0; + private long totalSleepTime = 0; + + private static final long SLEEP_DURATION_MS = 50; + + public ThrottledInputStream(InputStream rawStream) { + this(rawStream, Long.MAX_VALUE); + } + + public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) { + assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid"; + this.rawStream = rawStream; + this.maxBytesPerSec = maxBytesPerSec; + } + + @Override + public void close() throws IOException { + rawStream.close(); + } + + /** @inheritDoc */ + @Override + public int read() throws IOException { + throttle(); + int data = rawStream.read(); + if (data != -1) { + bytesRead++; + } + return data; + } + + /** @inheritDoc */ + @Override + public int read(byte[] b) throws IOException { + throttle(); + int readLen = rawStream.read(b); + if (readLen != -1) { + bytesRead += readLen; + } + return readLen; + } + + /** @inheritDoc */ + @Override + public int read(byte[] b, int off, int len) throws IOException { + throttle(); + int readLen = rawStream.read(b, off, len); + if (readLen != -1) { + bytesRead += readLen; + } + return readLen; + } + + private void throttle() throws IOException { + if (getBytesPerSec() > maxBytesPerSec) { + try { + Thread.sleep(SLEEP_DURATION_MS); + totalSleepTime += SLEEP_DURATION_MS; + } catch (InterruptedException e) { + throw new IOException("Thread aborted", e); + } + } + } + + /** + * Getter for the number of bytes read from this stream, since creation. + * @return The number of bytes. + */ + public long getTotalBytesRead() { + return bytesRead; + } + + /** + * Getter for the read-rate from this stream, since creation. + * Calculated as bytesRead/elapsedTimeSinceStart. + * @return Read rate, in bytes/sec. + */ + public long getBytesPerSec() { + long elapsed = (System.currentTimeMillis() - startTime) / 1000; + if (elapsed == 0) { + return bytesRead; + } else { + return bytesRead / elapsed; + } + } + + /** + * Getter the total time spent in sleep. + * @return Number of milliseconds spent in sleep. + */ + public long getTotalSleepTime() { + return totalSleepTime; + } + + /** @inheritDoc */ + @Override + public String toString() { + return "ThrottledInputStream{" + + "bytesRead=" + bytesRead + + ", maxBytesPerSec=" + maxBytesPerSec + + ", bytesPerSec=" + getBytesPerSec() + + ", totalSleepTime=" + totalSleepTime + + '}'; + } +}