diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMultiRegionInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMultiRegionInputFormat.java
new file mode 100644
index 0000000..98e757a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMultiRegionInputFormat.java
@@ -0,0 +1,216 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.IOException;
+
+/**
+ * Convert HBase tabular data into a format that is consumable by Map/Reduce.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class TableMultiRegionInputFormat extends TableMultiRegionInputFormatBase
+implements Configurable {
+
+ private final Log LOG = LogFactory.getLog(TableMultiRegionInputFormat.class);
+
+ /** Job parameter that specifies the input table. */
+ public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
+ /** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified.
+ * See {@link org.apache.hadoop.hbase.mapreduce.TableMultiRegionMapReduceUtil#convertScanToString(org.apache.hadoop.hbase.client.Scan)} for more details.
+ */
+ public static final String SCAN = "hbase.mapreduce.scan";
+ /** Scan start row */
+ public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
+ /** Scan stop row */
+ public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
+ /** Column Family to Scan */
+ public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
+ /** Space delimited list of columns and column families to scan. */
+ public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
+ /** The timestamp used to filter columns with a specific timestamp. */
+ public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
+ /** The starting timestamp used to filter columns with a specific range of versions. */
+ public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
+ /** The ending timestamp used to filter columns with a specific range of versions. */
+ public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
+ /** The maximum number of version to return. */
+ public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
+ /** Set to false to disable server-side caching of blocks for this scan. */
+ public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
+ /** The number of rows for caching that will be passed to scanners. */
+ public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
+ /** Set the maximum number of values to return for each call to next(). */
+ public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
+
+ /** The configuration. */
+ private Configuration conf = null;
+
+ /**
+ * Returns the current configuration.
+ *
+ * @return The current configuration.
+ * @see org.apache.hadoop.conf.Configurable#getConf()
+ */
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * Sets the configuration. This is used to set the details for the table to
+ * be scanned.
+ *
+ * @param configuration The configuration to set.
+ * @see org.apache.hadoop.conf.Configurable#setConf(
+ * org.apache.hadoop.conf.Configuration)
+ */
+ @Override
+ public void setConf(Configuration configuration) {
+ this.conf = configuration;
+ String tableName = conf.get(INPUT_TABLE);
+ try {
+ setHTable(new HTable(new Configuration(conf), tableName));
+ } catch (Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ }
+
+ Scan scan = null;
+
+ if (conf.get(SCAN) != null) {
+ try {
+ scan = TableMultiRegionMapReduceUtil.convertStringToScan(conf.get(SCAN));
+ } catch (IOException e) {
+ LOG.error("An error occurred.", e);
+ }
+ } else {
+ try {
+ scan = new Scan();
+
+ if (conf.get(SCAN_ROW_START) != null) {
+ scan.setStartRow(Bytes.toBytes(conf.get(SCAN_ROW_START)));
+ }
+
+ if (conf.get(SCAN_ROW_STOP) != null) {
+ scan.setStopRow(Bytes.toBytes(conf.get(SCAN_ROW_STOP)));
+ }
+
+ if (conf.get(SCAN_COLUMNS) != null) {
+ addColumns(scan, conf.get(SCAN_COLUMNS));
+ }
+
+ if (conf.get(SCAN_COLUMN_FAMILY) != null) {
+ scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
+ }
+
+ if (conf.get(SCAN_TIMESTAMP) != null) {
+ scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
+ }
+
+ if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
+ scan.setTimeRange(
+ Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
+ Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
+ }
+
+ if (conf.get(SCAN_MAXVERSIONS) != null) {
+ scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
+ }
+
+ if (conf.get(SCAN_CACHEDROWS) != null) {
+ scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
+ }
+
+ if (conf.get(SCAN_BATCHSIZE) != null) {
+ scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
+ }
+
+ // false by default, full table scans generate too much BC churn
+ scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
+ } catch (Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ }
+ }
+
+ setScan(scan);
+ }
+
+ /**
+ * Parses a combined family and qualifier and adds either both or just the
+ * family in case there is no qualifier. This assumes the older colon
+ * divided notation, e.g. "family:qualifier".
+ *
+ * @param scan The Scan to update.
+ * @param familyAndQualifier family and qualifier
+ * @return A reference to this instance.
+ * @throws IllegalArgumentException When familyAndQualifier is invalid.
+ */
+ private static void addColumn(Scan scan, byte[] familyAndQualifier) {
+ byte [][] fq = KeyValue.parseColumn(familyAndQualifier);
+ if (fq.length == 1) {
+ scan.addFamily(fq[0]);
+ } else if (fq.length == 2) {
+ scan.addColumn(fq[0], fq[1]);
+ } else {
+ throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
+ }
+ }
+
+ /**
+ * Adds an array of columns specified using old format, family:qualifier.
+ *
+ * Overrides previous calls to {@link org.apache.hadoop.hbase.client.Scan#addColumn(byte[], byte[])}for any families in the
+ * input.
+ *
+ * @param scan The Scan to update.
+ * @param columns array of columns, formatted as family:qualifier
+ * @see org.apache.hadoop.hbase.client.Scan#addColumn(byte[], byte[])
+ */
+ public static void addColumns(Scan scan, byte [][] columns) {
+ for (byte[] column : columns) {
+ addColumn(scan, column);
+ }
+ }
+
+ /**
+ * Convenience method to parse a string representation of an array of column specifiers.
+ *
+ * @param scan The Scan to update.
+ * @param columns The columns to parse.
+ */
+ private static void addColumns(Scan scan, String columns) {
+ String[] cols = columns.split(" ");
+ for (String col : cols) {
+ addColumn(scan, Bytes.toBytes(col));
+ }
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMultiRegionInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMultiRegionInputFormatBase.java
new file mode 100644
index 0000000..01a610b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMultiRegionInputFormatBase.java
@@ -0,0 +1,294 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.util.StringUtils;
+
+import javax.naming.NamingException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public abstract class TableMultiRegionInputFormatBase
+extends InputFormat {
+
+ final Log LOG = LogFactory.getLog(TableMultiRegionInputFormatBase.class);
+
+ /** Holds the details for the internal scanner. */
+ private Scan scan = null;
+ /** The table to scan. */
+ private HTable table = null;
+ /** The reader scanning the table, can be a custom one. */
+ private TableRecordReader tableRecordReader = null;
+
+
+ /** The reverse DNS lookup cache mapping: IPAddress => HostName */
+ private HashMap reverseDNSCacheMap =
+ new HashMap();
+
+ /** The NameServer address */
+ private String nameServer = null;
+
+ /**
+ * Builds a TableRecordReader. If no TableRecordReader was provided, uses
+ * the default.
+ *
+ * @param split The split to work with.
+ * @param context The current context.
+ * @return The newly created record reader.
+ * @throws java.io.IOException When creating the reader fails.
+ * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
+ * org.apache.hadoop.mapreduce.InputSplit,
+ * org.apache.hadoop.mapreduce.TaskAttemptContext)
+ */
+ @Override
+ public RecordReader createRecordReader(
+ InputSplit split, TaskAttemptContext context)
+ throws IOException {
+ if (table == null) {
+ throw new IOException("Cannot create a record reader because of a" +
+ " previous error. Please look at the previous logs lines from" +
+ " the task's full log for more details.");
+ }
+ TableSplit tSplit = (TableSplit) split;
+ LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes.");
+ TableRecordReader trr = this.tableRecordReader;
+ // if no table record reader was provided use default
+ if (trr == null) {
+ trr = new TableRecordReader();
+ }
+ Scan sc = new Scan(this.scan);
+ sc.setStartRow(tSplit.getStartRow());
+ sc.setStopRow(tSplit.getEndRow());
+ trr.setScan(sc);
+ trr.setHTable(table);
+ return trr;
+ }
+
+ /**
+ * Calculates the splits that will serve as input for the map tasks. The
+ * number of splits matches the number of regions in a table.
+ *
+ * @param context The current job context.
+ * @return The list of input splits.
+ * @throws java.io.IOException When creating the list of splits fails.
+ * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
+ * org.apache.hadoop.mapreduce.JobContext)
+ */
+ @Override
+ public List getSplits(JobContext context) throws IOException {
+ if (table == null) {
+ throw new IOException("No table was provided.");
+ }
+ // Get the name server address and the default value is null.
+ this.nameServer =
+ context.getConfiguration().get("hbase.nameserver.address", null);
+
+ // Get the number of regions per mapper, the default value is 1
+ String regionPerMapper =context.getConfiguration().get("hbase.mapreduce.scan.regionspermapper","1");
+ int regionPerMapperInt=1;
+ try{
+ regionPerMapperInt = Integer.parseInt(regionPerMapper);
+ LOG.debug("Number of regions per mapper: " + regionPerMapperInt);
+ }catch(NumberFormatException e){
+ LOG.error("ERROR when parseInt: hbase.mapreduce.scan.regionspermapper must be an integer ");
+ }
+ RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(table);
+
+ Pair keys = table.getStartEndKeys();
+ if (keys == null || keys.getFirst() == null ||
+ keys.getFirst().length == 0) {
+ HRegionLocation regLoc = table.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
+ if (null == regLoc) {
+ throw new IOException("Expecting at least one region.");
+ }
+ List splits = new ArrayList(1);
+ long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
+ TableSplit split = new TableSplit(table.getName(),
+ HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
+ .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
+ splits.add(split);
+ return splits;
+ }
+ int splitsNumber=keys.getFirst().length;
+ try{
+ double splitsNumberDouble= (double)keys.getFirst().length/regionPerMapperInt;
+ splitsNumber = (int) Math.ceil(splitsNumberDouble);
+ }catch(NumberFormatException e){
+ LOG.error("Number of mappers: "+keys.getFirst().length+" / "+regionPerMapperInt);
+ }
+ List splits = new ArrayList(splitsNumber);
+ for (int i = 0; i*regionPerMapperInt < keys.getFirst().length; i++) {
+ int startRegion=i*regionPerMapperInt;
+ int stopRegion=(i*regionPerMapperInt+regionPerMapperInt-1 regionLocationList =new ArrayList();
+ ArrayList regionNameList =new ArrayList();
+ for (int j=startRegion;j<=stopRegion;j++) {
+
+ HRegionLocation location = table.getRegionLocation(keys.getFirst()[j], false);
+ // The below InetSocketAddress creation does a name resolution.
+ InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());
+ if (isa.isUnresolved()) {
+ LOG.warn("Failed resolve " + isa);
+ }
+ InetAddress regionAddress = isa.getAddress();
+ String regionLocation;
+ try {
+ regionLocation = reverseDNS(regionAddress);
+ } catch (NamingException e) {
+ LOG.warn("Cannot resolve the host name for " + regionAddress + " because of " + e);
+ regionLocation = location.getHostname();
+ }
+ regionLocationList.add(regionLocation);
+ byte[] regionName = location.getRegionInfo().getRegionName();
+ regionNameList.add(regionName);
+
+ }
+ byte[] startRow = scan.getStartRow();
+ byte[] stopRow = scan.getStopRow();
+ // determine if the given start an stop key fall into the regions
+ if ((startRow.length == 0 || keys.getSecond()[stopRegion].length == 0 ||
+ Bytes.compareTo(startRow, keys.getSecond()[stopRegion]) < 0) &&
+ (stopRow.length == 0 ||
+ Bytes.compareTo(stopRow, keys.getFirst()[startRegion]) > 0)) {
+ byte[] splitStart = startRow.length == 0 ||
+ Bytes.compareTo(keys.getFirst()[startRegion], startRow) >= 0 ?
+ keys.getFirst()[startRegion] : startRow;
+ byte[] splitStop = (stopRow.length == 0 ||
+ Bytes.compareTo(keys.getSecond()[stopRegion], stopRow) <= 0) &&
+ keys.getSecond()[stopRegion].length > 0 ?
+ keys.getSecond()[stopRegion] : stopRow;
+
+ long regionSize = 0;
+ for (int k=0;k " + i + " -> " + split);
+ }
+ }
+ }
+ return splits;
+ }
+
+ private String reverseDNS(InetAddress ipAddress) throws NamingException {
+ String hostName = this.reverseDNSCacheMap.get(ipAddress);
+ if (hostName == null) {
+ hostName = Strings.domainNamePointerToHostName(
+ DNS.reverseDns(ipAddress, this.nameServer));
+ this.reverseDNSCacheMap.put(ipAddress, hostName);
+ }
+ return hostName;
+ }
+
+ /**
+ *
+ *
+ * Test if the given region is to be included in the InputSplit while splitting
+ * the regions of a table.
+ *
+ * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
+ * (and hence, not contributing to the InputSplit), given the start and end keys of the same.
+ * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R processing,
+ * continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due to the ordering of the keys.
+ *
+ *
+ * Note: It is possible that endKey.length() == 0 , for the last (recent) region.
+ *
+ * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded( i.e. all regions are included).
+ *
+ *
+ * @param startKey Start key of the region
+ * @param endKey End key of the region
+ * @return true, if this region needs to be included as part of the input (default).
+ *
+ */
+ protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) {
+ return true;
+ }
+
+ /**
+ * Allows subclasses to get the {@link org.apache.hadoop.hbase.client.HTable}.
+ */
+ protected HTable getHTable() {
+ return this.table;
+ }
+
+ /**
+ * Allows subclasses to set the {@link org.apache.hadoop.hbase.client.HTable}.
+ *
+ * @param table The table to get the data from.
+ */
+ protected void setHTable(HTable table) {
+ this.table = table;
+ }
+
+ /**
+ * Gets the scan defining the actual details like columns etc.
+ *
+ * @return The internal scan instance.
+ */
+ public Scan getScan() {
+ if (this.scan == null) this.scan = new Scan();
+ return scan;
+ }
+
+ /**
+ * Sets the scan defining the actual details like columns etc.
+ *
+ * @param scan The scan to set.
+ */
+ public void setScan(Scan scan) {
+ this.scan = scan;
+ }
+
+ /**
+ * Allows subclasses to set the {@link org.apache.hadoop.hbase.mapreduce.TableRecordReader}.
+ *
+ * @param tableRecordReader A different {@link org.apache.hadoop.hbase.mapreduce.TableRecordReader}
+ * implementation.
+ */
+ protected void setTableRecordReader(TableRecordReader tableRecordReader) {
+ this.tableRecordReader = tableRecordReader;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMultiRegionMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMultiRegionMapReduceUtil.java
new file mode 100644
index 0000000..6d847d4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMultiRegionMapReduceUtil.java
@@ -0,0 +1,956 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.hadoopbackport.JarFinder;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.*;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
+/**
+ * Utility for {@link org.apache.hadoop.hbase.mapreduce.TableMapper} and {@link org.apache.hadoop.hbase.mapreduce.TableReducer}
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class TableMultiRegionMapReduceUtil {
+ static Log LOG = LogFactory.getLog(TableMultiRegionMapReduceUtil.class);
+
+ /**
+ * Use this before submitting a TableMap job. It will appropriately set up
+ * the job.
+ *
+ * @param table The table name to read from.
+ * @param scan The scan instance with the columns, time range etc.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
+ * @throws java.io.IOException When setting up the details fails.
+ */
+ public static void initTableMapperJob(String table, Scan scan,
+ Class extends TableMapper> mapper,
+ Class> outputKeyClass,
+ Class> outputValueClass, Job job)
+ throws IOException {
+ initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass,
+ job, true);
+ }
+
+
+ /**
+ * Use this before submitting a TableMap job. It will appropriately set up
+ * the job.
+ *
+ * @param table The table name to read from.
+ * @param scan The scan instance with the columns, time range etc.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
+ * @throws java.io.IOException When setting up the details fails.
+ */
+ public static void initTableMapperJob(TableName table,
+ Scan scan,
+ Class extends TableMapper> mapper,
+ Class> outputKeyClass,
+ Class> outputValueClass,
+ Job job) throws IOException {
+ initTableMapperJob(table.getNameAsString(),
+ scan,
+ mapper,
+ outputKeyClass,
+ outputValueClass,
+ job,
+ true);
+ }
+
+ /**
+ * Use this before submitting a TableMap job. It will appropriately set up
+ * the job.
+ *
+ * @param table Binary representation of the table name to read from.
+ * @param scan The scan instance with the columns, time range etc.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
+ * @throws java.io.IOException When setting up the details fails.
+ */
+ public static void initTableMapperJob(byte[] table, Scan scan,
+ Class extends TableMapper> mapper,
+ Class> outputKeyClass,
+ Class> outputValueClass, Job job)
+ throws IOException {
+ initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass,
+ job, true);
+ }
+
+ /**
+ * Use this before submitting a TableMap job. It will appropriately set up
+ * the job.
+ *
+ * @param table The table name to read from.
+ * @param scan The scan instance with the columns, time range etc.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
+ * @param addDependencyJars upload HBase jars and jars for any of the configured
+ * job classes via the distributed cache (tmpjars).
+ * @throws java.io.IOException When setting up the details fails.
+ */
+ public static void initTableMapperJob(String table, Scan scan,
+ Class extends TableMapper> mapper,
+ Class> outputKeyClass,
+ Class> outputValueClass, Job job,
+ boolean addDependencyJars, Class extends InputFormat> inputFormatClass)
+ throws IOException {
+ initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job,
+ addDependencyJars, true, inputFormatClass);
+ }
+
+
+ /**
+ * Use this before submitting a TableMap job. It will appropriately set up
+ * the job.
+ *
+ * @param table The table name to read from.
+ * @param scan The scan instance with the columns, time range etc.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
+ * @param addDependencyJars upload HBase jars and jars for any of the configured
+ * job classes via the distributed cache (tmpjars).
+ * @param initCredentials whether to initialize hbase auth credentials for the job
+ * @param inputFormatClass the input format
+ * @throws java.io.IOException When setting up the details fails.
+ */
+ public static void initTableMapperJob(String table, Scan scan,
+ Class extends TableMapper> mapper,
+ Class> outputKeyClass,
+ Class> outputValueClass, Job job,
+ boolean addDependencyJars, boolean initCredentials,
+ Class extends InputFormat> inputFormatClass)
+ throws IOException {
+ job.setInputFormatClass(inputFormatClass);
+ if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
+ if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass);
+ job.setMapperClass(mapper);
+ if (Put.class.equals(outputValueClass)) {
+ job.setCombinerClass(PutCombiner.class);
+ }
+ Configuration conf = job.getConfiguration();
+ HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+ conf.set(TableMultiRegionInputFormat.INPUT_TABLE, table);
+ conf.set(TableMultiRegionInputFormat.SCAN, convertScanToString(scan));
+ conf.setStrings("io.serializations", conf.get("io.serializations"),
+ MutationSerialization.class.getName(), ResultSerialization.class.getName(),
+ KeyValueSerialization.class.getName());
+ if (addDependencyJars) {
+ addDependencyJars(job);
+ }
+ if (initCredentials) {
+ initCredentials(job);
+ }
+ }
+
+ /**
+ * Use this before submitting a TableMap job. It will appropriately set up
+ * the job.
+ *
+ * @param table Binary representation of the table name to read from.
+ * @param scan The scan instance with the columns, time range etc.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
+ * @param addDependencyJars upload HBase jars and jars for any of the configured
+ * job classes via the distributed cache (tmpjars).
+ * @param inputFormatClass The class of the input format
+ * @throws java.io.IOException When setting up the details fails.
+ */
+ public static void initTableMapperJob(byte[] table, Scan scan,
+ Class extends TableMapper> mapper,
+ Class> outputKeyClass,
+ Class> outputValueClass, Job job,
+ boolean addDependencyJars, Class extends InputFormat> inputFormatClass)
+ throws IOException {
+ initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
+ outputValueClass, job, addDependencyJars, inputFormatClass);
+ }
+
+ /**
+ * Use this before submitting a TableMap job. It will appropriately set up
+ * the job.
+ *
+ * @param table Binary representation of the table name to read from.
+ * @param scan The scan instance with the columns, time range etc.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
+ * @param addDependencyJars upload HBase jars and jars for any of the configured
+ * job classes via the distributed cache (tmpjars).
+ * @throws java.io.IOException When setting up the details fails.
+ */
+ public static void initTableMapperJob(byte[] table, Scan scan,
+ Class extends TableMapper> mapper,
+ Class> outputKeyClass,
+ Class> outputValueClass, Job job,
+ boolean addDependencyJars)
+ throws IOException {
+ initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
+ outputValueClass, job, addDependencyJars, TableMultiRegionInputFormat.class);
+ }
+
+ /**
+ * Use this before submitting a TableMap job. It will appropriately set up
+ * the job.
+ *
+ * @param table The table name to read from.
+ * @param scan The scan instance with the columns, time range etc.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
+ * @param addDependencyJars upload HBase jars and jars for any of the configured
+ * job classes via the distributed cache (tmpjars).
+ * @throws java.io.IOException When setting up the details fails.
+ */
+ public static void initTableMapperJob(String table, Scan scan,
+ Class extends TableMapper> mapper,
+ Class> outputKeyClass,
+ Class> outputValueClass, Job job,
+ boolean addDependencyJars)
+ throws IOException {
+ initTableMapperJob(table, scan, mapper, outputKeyClass,
+ outputValueClass, job, addDependencyJars, TableMultiRegionInputFormat.class);
+ }
+
+ /**
+ * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on
+ * direct memory will likely cause the map tasks to OOM when opening the region. This
+ * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user
+ * wants to override this behavior in their job.
+ */
+ public static void resetCacheConfig(Configuration conf) {
+ conf.setFloat(
+ HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
+ conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f);
+ conf.setFloat("hbase.bucketcache.size", 0f);
+ }
+
+ /**
+ * Sets up the job for reading from a table snapshot. It bypasses hbase servers
+ * and read directly from snapshot files.
+ *
+ * @param snapshotName The name of the snapshot (of a table) to read from.
+ * @param scan The scan instance with the columns, time range etc.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
+ * @param addDependencyJars upload HBase jars and jars for any of the configured
+ * job classes via the distributed cache (tmpjars).
+ *
+ * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
+ * have write permissions to this directory, and this should not be a subdirectory of rootdir.
+ * After the job is finished, restore directory can be deleted.
+ * @throws java.io.IOException When setting up the details fails.
+ * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
+ */
+ public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
+ Class extends TableMapper> mapper,
+ Class> outputKeyClass,
+ Class> outputValueClass, Job job,
+ boolean addDependencyJars, Path tmpRestoreDir)
+ throws IOException {
+ TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
+ initTableMapperJob(snapshotName, scan, mapper, outputKeyClass,
+ outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class);
+ resetCacheConfig(job.getConfiguration());
+ }
+
+ /**
+ * Use this before submitting a Multi TableMap job. It will appropriately set
+ * up the job.
+ *
+ * @param scans The list of {@link org.apache.hadoop.hbase.client.Scan} objects to read from.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is carrying
+ * all necessary HBase configuration.
+ * @throws java.io.IOException When setting up the details fails.
+ */
+ public static void initTableMapperJob(List scans,
+ Class extends TableMapper> mapper,
+ Class extends WritableComparable> outputKeyClass,
+ Class extends Writable> outputValueClass, Job job) throws IOException {
+ initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
+ true);
+ }
+
+ /**
+ * Use this before submitting a Multi TableMap job. It will appropriately set
+ * up the job.
+ *
+ * @param scans The list of {@link org.apache.hadoop.hbase.client.Scan} objects to read from.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is carrying
+ * all necessary HBase configuration.
+ * @param addDependencyJars upload HBase jars and jars for any of the
+ * configured job classes via the distributed cache (tmpjars).
+ * @throws java.io.IOException When setting up the details fails.
+ */
+ public static void initTableMapperJob(List scans,
+ Class extends TableMapper> mapper,
+ Class extends WritableComparable> outputKeyClass,
+ Class extends Writable> outputValueClass, Job job,
+ boolean addDependencyJars) throws IOException {
+ initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
+ addDependencyJars, true);
+ }
+
+ /**
+ * Use this before submitting a Multi TableMap job. It will appropriately set
+ * up the job.
+ *
+ * @param scans The list of {@link org.apache.hadoop.hbase.client.Scan} objects to read from.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is carrying
+ * all necessary HBase configuration.
+ * @param addDependencyJars upload HBase jars and jars for any of the
+ * configured job classes via the distributed cache (tmpjars).
+ * @param initCredentials whether to initialize hbase auth credentials for the job
+ * @throws java.io.IOException When setting up the details fails.
+ */
+ public static void initTableMapperJob(List scans,
+ Class extends TableMapper> mapper,
+ Class extends WritableComparable> outputKeyClass,
+ Class extends Writable> outputValueClass, Job job,
+ boolean addDependencyJars,
+ boolean initCredentials) throws IOException {
+ job.setInputFormatClass(MultiTableInputFormat.class);
+ if (outputValueClass != null) {
+ job.setMapOutputValueClass(outputValueClass);
+ }
+ if (outputKeyClass != null) {
+ job.setMapOutputKeyClass(outputKeyClass);
+ }
+ job.setMapperClass(mapper);
+ Configuration conf = job.getConfiguration();
+ HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+ List scanStrings = new ArrayList();
+
+ for (Scan scan : scans) {
+ scanStrings.add(convertScanToString(scan));
+ }
+ job.getConfiguration().setStrings(MultiTableInputFormat.SCANS,
+ scanStrings.toArray(new String[scanStrings.size()]));
+
+ if (addDependencyJars) {
+ addDependencyJars(job);
+ }
+
+ if (initCredentials) {
+ initCredentials(job);
+ }
+ }
+
+ public static void initCredentials(Job job) throws IOException {
+ UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
+ if (userProvider.isHadoopSecurityEnabled()) {
+ // propagate delegation related props from launcher job to MR job
+ if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
+ job.getConfiguration().set("mapreduce.job.credentials.binary",
+ System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
+ }
+ }
+
+ if (userProvider.isHBaseSecurityEnabled()) {
+ try {
+ // init credentials for remote cluster
+ String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
+ User user = userProvider.getCurrent();
+ if (quorumAddress != null) {
+ Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
+ ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
+ obtainAuthTokenForJob(job, peerConf, user);
+ }
+
+ obtainAuthTokenForJob(job, job.getConfiguration(), user);
+ } catch (InterruptedException ie) {
+ LOG.info("Interrupted obtaining user authentication token");
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Obtain an authentication token, for the specified cluster, on behalf of the current user
+ * and add it to the credentials for the given map reduce job.
+ *
+ * The quorumAddress is the key to the ZK ensemble, which contains:
+ * hbase.zookeeper.quorum, hbase.zookeeper.client.port and zookeeper.znode.parent
+ *
+ * @param job The job that requires the permission.
+ * @param quorumAddress string that contains the 3 required configuratins
+ * @throws java.io.IOException When the authentication token cannot be obtained.
+ */
+ public static void initCredentialsForCluster(Job job, String quorumAddress)
+ throws IOException {
+ UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
+ if (userProvider.isHBaseSecurityEnabled()) {
+ try {
+ Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
+ ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
+ obtainAuthTokenForJob(job, peerConf, userProvider.getCurrent());
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted obtaining user authentication token");
+ Thread.interrupted();
+ }
+ }
+ }
+
+ private static void obtainAuthTokenForJob(Job job, Configuration conf, User user)
+ throws IOException, InterruptedException {
+ Token authToken = getAuthToken(conf, user);
+ if (authToken == null) {
+ user.obtainAuthTokenForJob(conf, job);
+ } else {
+ job.getCredentials().addToken(authToken.getService(), authToken);
+ }
+ }
+
+ /**
+ * Get the authentication token of the user for the cluster specified in the configuration
+ * @return null if the user does not have the token, otherwise the auth token for the cluster.
+ */
+ private static Token getAuthToken(Configuration conf, User user)
+ throws IOException, InterruptedException {
+ ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "mr-init-credentials", null);
+ try {
+ String clusterId = ZKClusterId.readClusterIdZNode(zkw);
+ return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getUGI().getTokens());
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ } finally {
+ zkw.close();
+ }
+ }
+
+ /**
+ * Writes the given scan into a Base64 encoded string.
+ *
+ * @param scan The scan to write out.
+ * @return The scan saved in a Base64 encoded string.
+ * @throws java.io.IOException When writing the scan fails.
+ */
+ static String convertScanToString(Scan scan) throws IOException {
+ ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
+ return Base64.encodeBytes(proto.toByteArray());
+ }
+
+ /**
+ * Converts the given Base64 string back into a Scan instance.
+ *
+ * @param base64 The scan details.
+ * @return The newly created Scan instance.
+ * @throws java.io.IOException When reading the scan instance fails.
+ */
+ static Scan convertStringToScan(String base64) throws IOException {
+ byte [] decoded = Base64.decode(base64);
+ ClientProtos.Scan scan;
+ try {
+ scan = ClientProtos.Scan.parseFrom(decoded);
+ } catch (InvalidProtocolBufferException ipbe) {
+ throw new IOException(ipbe);
+ }
+
+ return ProtobufUtil.toScan(scan);
+ }
+
+ /**
+ * Use this before submitting a TableReduce job. It will
+ * appropriately set up the JobConf.
+ *
+ * @param table The output table.
+ * @param reducer The reducer class to use.
+ * @param job The current job to adjust.
+ * @throws java.io.IOException When determining the region count fails.
+ */
+ public static void initTableReducerJob(String table,
+ Class extends TableReducer> reducer, Job job)
+ throws IOException {
+ initTableReducerJob(table, reducer, job, null);
+ }
+
+ /**
+ * Use this before submitting a TableReduce job. It will
+ * appropriately set up the JobConf.
+ *
+ * @param table The output table.
+ * @param reducer The reducer class to use.
+ * @param job The current job to adjust.
+ * @param partitioner Partitioner to use. Pass null to use
+ * default partitioner.
+ * @throws java.io.IOException When determining the region count fails.
+ */
+ public static void initTableReducerJob(String table,
+ Class extends TableReducer> reducer, Job job,
+ Class partitioner) throws IOException {
+ initTableReducerJob(table, reducer, job, partitioner, null, null, null);
+ }
+
+ /**
+ * Use this before submitting a TableReduce job. It will
+ * appropriately set up the JobConf.
+ *
+ * @param table The output table.
+ * @param reducer The reducer class to use.
+ * @param job The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
+ * @param partitioner Partitioner to use. Pass null to use
+ * default partitioner.
+ * @param quorumAddress Distant cluster to write to; default is null for
+ * output to the cluster that is designated in hbase-site.xml.
+ * Set this String to the zookeeper ensemble of an alternate remote cluster
+ * when you would have the reduce write a cluster that is other than the
+ * default; e.g. copying tables between clusters, the source would be
+ * designated by hbase-site.xml and this param would have the
+ * ensemble address of the remote cluster. The format to pass is particular.
+ * Pass <hbase.zookeeper.quorum>:<hbase.zookeeper.client.port>:<zookeeper.znode.parent>
+ * such as server,server2,server3:2181:/hbase.
+ * @param serverClass redefined hbase.regionserver.class
+ * @param serverImpl redefined hbase.regionserver.impl
+ * @throws java.io.IOException When determining the region count fails.
+ */
+ public static void initTableReducerJob(String table,
+ Class extends TableReducer> reducer, Job job,
+ Class partitioner, String quorumAddress, String serverClass,
+ String serverImpl) throws IOException {
+ initTableReducerJob(table, reducer, job, partitioner, quorumAddress,
+ serverClass, serverImpl, true);
+ }
+
+ /**
+ * Use this before submitting a TableReduce job. It will
+ * appropriately set up the JobConf.
+ *
+ * @param table The output table.
+ * @param reducer The reducer class to use.
+ * @param job The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
+ * @param partitioner Partitioner to use. Pass null to use
+ * default partitioner.
+ * @param quorumAddress Distant cluster to write to; default is null for
+ * output to the cluster that is designated in hbase-site.xml.
+ * Set this String to the zookeeper ensemble of an alternate remote cluster
+ * when you would have the reduce write a cluster that is other than the
+ * default; e.g. copying tables between clusters, the source would be
+ * designated by hbase-site.xml and this param would have the
+ * ensemble address of the remote cluster. The format to pass is particular.
+ * Pass <hbase.zookeeper.quorum>:<hbase.zookeeper.client.port>:<zookeeper.znode.parent>
+ * such as server,server2,server3:2181:/hbase.
+ * @param serverClass redefined hbase.regionserver.class
+ * @param serverImpl redefined hbase.regionserver.impl
+ * @param addDependencyJars upload HBase jars and jars for any of the configured
+ * job classes via the distributed cache (tmpjars).
+ * @throws java.io.IOException When determining the region count fails.
+ */
+ public static void initTableReducerJob(String table,
+ Class extends TableReducer> reducer, Job job,
+ Class partitioner, String quorumAddress, String serverClass,
+ String serverImpl, boolean addDependencyJars) throws IOException {
+
+ Configuration conf = job.getConfiguration();
+ HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+ job.setOutputFormatClass(TableOutputFormat.class);
+ if (reducer != null) job.setReducerClass(reducer);
+ conf.set(TableOutputFormat.OUTPUT_TABLE, table);
+ conf.setStrings("io.serializations", conf.get("io.serializations"),
+ MutationSerialization.class.getName(), ResultSerialization.class.getName());
+ // If passed a quorum/ensemble address, pass it on to TableOutputFormat.
+ if (quorumAddress != null) {
+ // Calling this will validate the format
+ ZKUtil.transformClusterKey(quorumAddress);
+ conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
+ }
+ if (serverClass != null && serverImpl != null) {
+ conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
+ conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
+ }
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(Writable.class);
+ if (partitioner == HRegionPartitioner.class) {
+ job.setPartitionerClass(HRegionPartitioner.class);
+ int regions = MetaTableAccessor.getRegionCount(conf, table);
+ if (job.getNumReduceTasks() > regions) {
+ job.setNumReduceTasks(regions);
+ }
+ } else if (partitioner != null) {
+ job.setPartitionerClass(partitioner);
+ }
+
+ if (addDependencyJars) {
+ addDependencyJars(job);
+ }
+
+ initCredentials(job);
+ }
+
+ /**
+ * Ensures that the given number of reduce tasks for the given job
+ * configuration does not exceed the number of regions for the given table.
+ *
+ * @param table The table to get the region count for.
+ * @param job The current job to adjust.
+ * @throws java.io.IOException When retrieving the table details fails.
+ */
+ public static void limitNumReduceTasks(String table, Job job)
+ throws IOException {
+ int regions = MetaTableAccessor.getRegionCount(job.getConfiguration(), table);
+ if (job.getNumReduceTasks() > regions)
+ job.setNumReduceTasks(regions);
+ }
+
+ /**
+ * Sets the number of reduce tasks for the given job configuration to the
+ * number of regions the given table has.
+ *
+ * @param table The table to get the region count for.
+ * @param job The current job to adjust.
+ * @throws java.io.IOException When retrieving the table details fails.
+ */
+ public static void setNumReduceTasks(String table, Job job)
+ throws IOException {
+ job.setNumReduceTasks(MetaTableAccessor.getRegionCount(job.getConfiguration(), table));
+ }
+
+ /**
+ * Sets the number of rows to return and cache with each scanner iteration.
+ * Higher caching values will enable faster mapreduce jobs at the expense of
+ * requiring more heap to contain the cached rows.
+ *
+ * @param job The current job to adjust.
+ * @param batchSize The number of rows to return in batch with each scanner
+ * iteration.
+ */
+ public static void setScannerCaching(Job job, int batchSize) {
+ job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
+ }
+
+ /**
+ * Add HBase and its dependencies (only) to the job configuration.
+ *
+ * This is intended as a low-level API, facilitating code reuse between this
+ * class and its mapred counterpart. It also of use to extenral tools that
+ * need to build a MapReduce job that interacts with HBase but want
+ * fine-grained control over the jars shipped to the cluster.
+ *
+ * @param conf The Configuration object to extend with dependencies.
+ * @see org.apache.hadoop.hbase.mapred.TableMultiRegionMapReduceUtil
+ * @see PIG-3285
+ */
+ public static void addHBaseDependencyJars(Configuration conf) throws IOException {
+ addDependencyJars(conf,
+ // explicitly pull a class from each module
+ HConstants.class, // hbase-common
+ ClientProtos.class, // hbase-protocol
+ Put.class, // hbase-client
+ org.apache.hadoop.hbase.CompatibilityFactory.class, // hbase-hadoop-compat
+ TableMapper.class, // hbase-server
+ // pull necessary dependencies
+ org.apache.zookeeper.ZooKeeper.class,
+ io.netty.channel.Channel.class,
+ com.google.protobuf.Message.class,
+ com.google.common.collect.Lists.class,
+ org.htrace.Trace.class);
+ }
+
+ /**
+ * Returns a classpath string built from the content of the "tmpjars" value in {@code conf}.
+ * Also exposed to shell scripts via `bin/hbase mapredcp`.
+ */
+ public static String buildDependencyClasspath(Configuration conf) {
+ if (conf == null) {
+ throw new IllegalArgumentException("Must provide a configuration object.");
+ }
+ Set paths = new HashSet(conf.getStringCollection("tmpjars"));
+ if (paths.size() == 0) {
+ throw new IllegalArgumentException("Configuration contains no tmpjars.");
+ }
+ StringBuilder sb = new StringBuilder();
+ for (String s : paths) {
+ // entries can take the form 'file:/path/to/file.jar'.
+ int idx = s.indexOf(":");
+ if (idx != -1) s = s.substring(idx + 1);
+ if (sb.length() > 0) sb.append(File.pathSeparator);
+ sb.append(s);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Add the HBase dependency jars as well as jars for any of the configured
+ * job classes to the job configuration, so that JobClient will ship them
+ * to the cluster and add them to the DistributedCache.
+ */
+ public static void addDependencyJars(Job job) throws IOException {
+ addHBaseDependencyJars(job.getConfiguration());
+ try {
+ addDependencyJars(job.getConfiguration(),
+ // when making changes here, consider also mapred.TableMultiRegionMapReduceUtil
+ // pull job classes
+ job.getMapOutputKeyClass(),
+ job.getMapOutputValueClass(),
+ job.getInputFormatClass(),
+ job.getOutputKeyClass(),
+ job.getOutputValueClass(),
+ job.getOutputFormatClass(),
+ job.getPartitionerClass(),
+ job.getCombinerClass());
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Add the jars containing the given classes to the job's configuration
+ * such that JobClient will ship them to the cluster and add them to
+ * the DistributedCache.
+ */
+ public static void addDependencyJars(Configuration conf,
+ Class>... classes) throws IOException {
+
+ FileSystem localFs = FileSystem.getLocal(conf);
+ Set jars = new HashSet();
+ // Add jars that are already in the tmpjars variable
+ jars.addAll(conf.getStringCollection("tmpjars"));
+
+ // add jars as we find them to a map of contents jar name so that we can avoid
+ // creating new jars for classes that have already been packaged.
+ Map packagedClasses = new HashMap();
+
+ // Add jars containing the specified classes
+ for (Class> clazz : classes) {
+ if (clazz == null) continue;
+
+ Path path = findOrCreateJar(clazz, localFs, packagedClasses);
+ if (path == null) {
+ LOG.warn("Could not find jar for class " + clazz +
+ " in order to ship it to the cluster.");
+ continue;
+ }
+ if (!localFs.exists(path)) {
+ LOG.warn("Could not validate jar file " + path + " for class "
+ + clazz);
+ continue;
+ }
+ jars.add(path.toString());
+ }
+ if (jars.isEmpty()) return;
+
+ conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
+ }
+
+ /**
+ * If org.apache.hadoop.util.JarFinder is available (0.23+ hadoop), finds
+ * the Jar for a class or creates it if it doesn't exist. If the class is in
+ * a directory in the classpath, it creates a Jar on the fly with the
+ * contents of the directory and returns the path to that Jar. If a Jar is
+ * created, it is created in the system temporary directory. Otherwise,
+ * returns an existing jar that contains a class of the same name. Maintains
+ * a mapping from jar contents to the tmp jar created.
+ * @param my_class the class to find.
+ * @param fs the FileSystem with which to qualify the returned path.
+ * @param packagedClasses a map of class name to path.
+ * @return a jar file that contains the class.
+ * @throws java.io.IOException
+ */
+ private static Path findOrCreateJar(Class> my_class, FileSystem fs,
+ Map packagedClasses)
+ throws IOException {
+ // attempt to locate an existing jar for the class.
+ String jar = findContainingJar(my_class, packagedClasses);
+ if (null == jar || jar.isEmpty()) {
+ jar = getJar(my_class);
+ updateMap(jar, packagedClasses);
+ }
+
+ if (null == jar || jar.isEmpty()) {
+ return null;
+ }
+
+ LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar));
+ return new Path(jar).makeQualified(fs);
+ }
+
+ /**
+ * Add entries to packagedClasses corresponding to class files
+ * contained in jar.
+ * @param jar The jar who's content to list.
+ * @param packagedClasses map[class -> jar]
+ */
+ private static void updateMap(String jar, Map packagedClasses) throws IOException {
+ if (null == jar || jar.isEmpty()) {
+ return;
+ }
+ ZipFile zip = null;
+ try {
+ zip = new ZipFile(jar);
+ for (Enumeration extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) {
+ ZipEntry entry = iter.nextElement();
+ if (entry.getName().endsWith("class")) {
+ packagedClasses.put(entry.getName(), jar);
+ }
+ }
+ } finally {
+ if (null != zip) zip.close();
+ }
+ }
+
+ /**
+ * Find a jar that contains a class of the same name, if any. It will return
+ * a jar file, even if that is not the first thing on the class path that
+ * has a class with the same name. Looks first on the classpath and then in
+ * the packagedClasses map.
+ * @param my_class the class to find.
+ * @return a jar file that contains the class, or null.
+ * @throws java.io.IOException
+ */
+ private static String findContainingJar(Class> my_class, Map packagedClasses)
+ throws IOException {
+ ClassLoader loader = my_class.getClassLoader();
+ String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
+
+ // first search the classpath
+ for (Enumeration itr = loader.getResources(class_file); itr.hasMoreElements();) {
+ URL url = itr.nextElement();
+ if ("jar".equals(url.getProtocol())) {
+ String toReturn = url.getPath();
+ if (toReturn.startsWith("file:")) {
+ toReturn = toReturn.substring("file:".length());
+ }
+ // URLDecoder is a misnamed class, since it actually decodes
+ // x-www-form-urlencoded MIME type rather than actual
+ // URL encoding (which the file path has). Therefore it would
+ // decode +s to ' 's which is incorrect (spaces are actually
+ // either unencoded or encoded as "%20"). Replace +s first, so
+ // that they are kept sacred during the decoding process.
+ toReturn = toReturn.replaceAll("\\+", "%2B");
+ toReturn = URLDecoder.decode(toReturn, "UTF-8");
+ return toReturn.replaceAll("!.*$", "");
+ }
+ }
+
+ // now look in any jars we've packaged using JarFinder. Returns null when
+ // no jar is found.
+ return packagedClasses.get(class_file);
+ }
+
+ /**
+ * Invoke 'getJar' on a JarFinder implementation. Useful for some job
+ * configuration contexts (HBASE-8140) and also for testing on MRv2. First
+ * check if we have HADOOP-9426. Lacking that, fall back to the backport.
+ * @param my_class the class to find.
+ * @return a jar file that contains the class, or null.
+ */
+ private static String getJar(Class> my_class) {
+ String ret = null;
+ String hadoopJarFinder = "org.apache.hadoop.util.JarFinder";
+ Class> jarFinder = null;
+ try {
+ LOG.debug("Looking for " + hadoopJarFinder + ".");
+ jarFinder = Class.forName(hadoopJarFinder);
+ LOG.debug(hadoopJarFinder + " found.");
+ Method getJar = jarFinder.getMethod("getJar", Class.class);
+ ret = (String) getJar.invoke(null, my_class);
+ } catch (ClassNotFoundException e) {
+ LOG.debug("Using backported JarFinder.");
+ ret = JarFinder.getJar(my_class);
+ } catch (InvocationTargetException e) {
+ // function was properly called, but threw it's own exception. Unwrap it
+ // and pass it on.
+ throw new RuntimeException(e.getCause());
+ } catch (Exception e) {
+ // toss all other exceptions, related to reflection failure
+ throw new RuntimeException("getJar invocation failed.", e);
+ }
+
+ return ret;
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionInputFormatScan1.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionInputFormatScan1.java
new file mode 100644
index 0000000..a6ffffb
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionInputFormatScan1.java
@@ -0,0 +1,100 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+
+/**
+ * TestTableMultiRegionInputFormatScan part 1.
+ * @see org.apache.hadoop.hbase.mapreduce.TestTableMultiRegionInputFormatScanBase
+ */
+@Category({VerySlowMapReduceTests.class, LargeTests.class})
+public class TestTableMultiRegionInputFormatScan1 extends TestTableMultiRegionInputFormatScanBase {
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws java.io.IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanEmptyToEmpty()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan(null, null, null);
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws java.io.IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanEmptyToAPP()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan(null, "app", "apo");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws java.io.IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanEmptyToBBA()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan(null, "bba", "baz");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws java.io.IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanEmptyToBBB()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan(null, "bbb", "bba");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws java.io.IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanEmptyToOPP()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan(null, "opp", "opo");
+ }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionInputFormatScan2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionInputFormatScan2.java
new file mode 100644
index 0000000..92fb67e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionInputFormatScan2.java
@@ -0,0 +1,118 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * TestTableMultiRegionInputFormatScan part 2.
+ * @see org.apache.hadoop.hbase.mapreduce.TestTableMultiRegionInputFormatScanBase
+ */
+@Category({VerySlowMapReduceTests.class, LargeTests.class})
+public class TestTableMultiRegionInputFormatScan2 extends TestTableMultiRegionInputFormatScanBase {
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws java.io.IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanOBBToOPP()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan("obb", "opp", "opo");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws java.io.IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanOBBToQPP()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan("obb", "qpp", "qpo");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws java.io.IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanOPPToEmpty()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan("opp", null, "zzz");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws java.io.IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanYYXToEmpty()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan("yyx", null, "zzz");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws java.io.IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanYYYToEmpty()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan("yyy", null, "zzz");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws java.io.IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanYZYToEmpty()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan("yzy", null, "zzz");
+ }
+
+ @Test
+ public void testScanFromConfiguration()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScanFromConfiguration("bba", "bbd", "bbc");
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionInputFormatScanBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionInputFormatScanBase.java
new file mode 100644
index 0000000..6a4d30d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionInputFormatScanBase.java
@@ -0,0 +1,242 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ *
+ * Tests various scan start and stop row scenarios. This is set in a scan and
+ * tested in a MapReduce job to see if that is handed over and done properly
+ * too.
+ *
+ *
+ * This test is copied from another test:TestRegionInputFormatScanBase
+ *
+ */
+public abstract class TestTableMultiRegionInputFormatScanBase {
+
+ static final Log LOG = LogFactory.getLog(TestTableMultiRegionInputFormatScanBase.class);
+ static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ static final byte[] TABLE_NAME = Bytes.toBytes("scantest");
+ static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
+ static final String KEY_STARTROW = "startRow";
+ static final String KEY_LASTROW = "stpRow";
+
+ private static HTable table = null;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // test intermittently fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on.
+ // this turns it off for this test. TODO: Figure out why scr breaks recovery.
+ System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
+
+ // switch TIF to log at DEBUG level
+ TEST_UTIL.enableDebug(TableMultiRegionInputFormat.class);
+ TEST_UTIL.enableDebug(TableMultiRegionInputFormatBase.class);
+ // start mini hbase cluster
+ TEST_UTIL.startMiniCluster(3);
+ // create and fill table
+ table = TEST_UTIL.createTable(TABLE_NAME, INPUT_FAMILY);
+ TEST_UTIL.createMultiRegions(table, INPUT_FAMILY);
+ TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
+ // start MR cluster
+ TEST_UTIL.startMiniMapReduceCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniMapReduceCluster();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * Pass the key and value to reduce.
+ */
+ public static class ScanMapper
+ extends TableMapper {
+
+ /**
+ * Pass the key and value to reduce.
+ *
+ * @param key The key, here "aaa", "aab" etc.
+ * @param value The value is the same as the key.
+ * @param context The task context.
+ * @throws java.io.IOException When reading the rows fails.
+ */
+ @Override
+ public void map(ImmutableBytesWritable key, Result value,
+ Context context)
+ throws IOException, InterruptedException {
+ if (value.size() != 1) {
+ throw new IOException("There should only be one input column");
+ }
+ Map>>
+ cf = value.getMap();
+ if(!cf.containsKey(INPUT_FAMILY)) {
+ throw new IOException("Wrong input columns. Missing: '" +
+ Bytes.toString(INPUT_FAMILY) + "'.");
+ }
+ String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
+ LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) +
+ ", value -> " + val);
+ context.write(key, key);
+ }
+
+ }
+
+ /**
+ * Checks the last and first key seen against the scanner boundaries.
+ */
+ public static class ScanReducer
+ extends Reducer {
+
+ private String first = null;
+ private String last = null;
+
+ protected void reduce(ImmutableBytesWritable key,
+ Iterable values, Context context)
+ throws IOException ,InterruptedException {
+ int count = 0;
+ for (ImmutableBytesWritable value : values) {
+ String val = Bytes.toStringBinary(value.get());
+ LOG.info("reduce: key[" + count + "] -> " +
+ Bytes.toStringBinary(key.get()) + ", value -> " + val);
+ if (first == null) first = val;
+ last = val;
+ count++;
+ }
+ }
+
+ protected void cleanup(Context context)
+ throws IOException, InterruptedException {
+ Configuration c = context.getConfiguration();
+ String startRow = c.get(KEY_STARTROW);
+ String lastRow = c.get(KEY_LASTROW);
+ LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\"");
+ LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\"");
+ if (startRow != null && startRow.length() > 0) {
+ assertEquals(startRow, first);
+ }
+ if (lastRow != null && lastRow.length() > 0) {
+ assertEquals(lastRow, last);
+ }
+ }
+
+ }
+
+ /**
+ * Tests an MR Scan initialized from properties set in the Configuration.
+ *
+ * @throws java.io.IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ protected void testScanFromConfiguration(String start, String stop, String last)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase() : "Empty") +
+ "To" + (stop != null ? stop.toUpperCase() : "Empty");
+ Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+ c.set(TableMultiRegionInputFormat.INPUT_TABLE, Bytes.toString(TABLE_NAME));
+ c.set(TableMultiRegionInputFormat.SCAN_COLUMN_FAMILY, Bytes.toString(INPUT_FAMILY));
+ c.set(KEY_STARTROW, start != null ? start : "");
+ c.set(KEY_LASTROW, last != null ? last : "");
+
+ if (start != null) {
+ c.set(TableMultiRegionInputFormat.SCAN_ROW_START, start);
+ }
+
+ if (stop != null) {
+ c.set(TableMultiRegionInputFormat.SCAN_ROW_STOP, stop);
+ }
+
+ Job job = new Job(c, jobName);
+ job.setMapperClass(ScanMapper.class);
+ job.setReducerClass(ScanReducer.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(ImmutableBytesWritable.class);
+ job.setInputFormatClass(TableMultiRegionInputFormat.class);
+ job.setNumReduceTasks(1);
+ FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
+ TableMultiRegionMapReduceUtil.addDependencyJars(job);
+ assertTrue(job.waitForCompletion(true));
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws java.io.IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ protected void testScan(String start, String stop, String last)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ String jobName = "Scan" + (start != null ? start.toUpperCase() : "Empty") +
+ "To" + (stop != null ? stop.toUpperCase() : "Empty");
+ LOG.info("Before map/reduce startup - job " + jobName);
+ Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+ Scan scan = new Scan();
+ scan.addFamily(INPUT_FAMILY);
+ if (start != null) {
+ scan.setStartRow(Bytes.toBytes(start));
+ }
+ c.set(KEY_STARTROW, start != null ? start : "");
+ if (stop != null) {
+ scan.setStopRow(Bytes.toBytes(stop));
+ }
+ c.set(KEY_LASTROW, last != null ? last : "");
+ LOG.info("scan before: " + scan);
+ Job job = new Job(c, jobName);
+ TableMultiRegionMapReduceUtil.initTableMapperJob(
+ Bytes.toString(TABLE_NAME), scan, ScanMapper.class,
+ ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
+ job.setReducerClass(ScanReducer.class);
+ job.setNumReduceTasks(1); // one to get final "first" and "last" key
+ FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
+ LOG.info("Started " + job.getJobName());
+ assertTrue(job.waitForCompletion(true));
+ LOG.info("After map/reduce completion - job " + jobName);
+ }
+
+}
+
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionMapReduceUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionMapReduceUtil.java
new file mode 100644
index 0000000..0a4f651
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionMapReduceUtil.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Test different variants of initTableMapperJob method
+ */
+@Category({MapReduceTests.class, SmallTests.class})
+public class TestTableMultiRegionMapReduceUtil {
+
+ /*
+ * initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because
+ * the method depends on an online cluster.
+ */
+
+ @Test
+ public void testInitTableMapperJob1() throws Exception {
+ Configuration configuration = new Configuration();
+ Job job = new Job(configuration, "tableName");
+ // test
+ TableMultiRegionMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class,
+ Text.class, job, false, HLogInputFormat.class);
+ assertEquals(HLogInputFormat.class, job.getInputFormatClass());
+ assertEquals(Import.Importer.class, job.getMapperClass());
+ assertEquals(LongWritable.class, job.getOutputKeyClass());
+ assertEquals(Text.class, job.getOutputValueClass());
+ assertNull(job.getCombinerClass());
+ assertEquals("Table", job.getConfiguration().get(TableMultiRegionInputFormat.INPUT_TABLE));
+ }
+
+ @Test
+ public void testInitTableMapperJob2() throws Exception {
+ Configuration configuration = new Configuration();
+ Job job = new Job(configuration, "tableName");
+ TableMultiRegionMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
+ Import.Importer.class, Text.class, Text.class, job, false, HLogInputFormat.class);
+ assertEquals(HLogInputFormat.class, job.getInputFormatClass());
+ assertEquals(Import.Importer.class, job.getMapperClass());
+ assertEquals(LongWritable.class, job.getOutputKeyClass());
+ assertEquals(Text.class, job.getOutputValueClass());
+ assertNull(job.getCombinerClass());
+ assertEquals("Table", job.getConfiguration().get(TableMultiRegionInputFormat.INPUT_TABLE));
+ }
+
+ @Test
+ public void testInitTableMapperJob3() throws Exception {
+ Configuration configuration = new Configuration();
+ Job job = new Job(configuration, "tableName");
+ TableMultiRegionMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
+ Import.Importer.class, Text.class, Text.class, job);
+ assertEquals(TableMultiRegionInputFormat.class, job.getInputFormatClass());
+ assertEquals(Import.Importer.class, job.getMapperClass());
+ assertEquals(LongWritable.class, job.getOutputKeyClass());
+ assertEquals(Text.class, job.getOutputValueClass());
+ assertNull(job.getCombinerClass());
+ assertEquals("Table", job.getConfiguration().get(TableMultiRegionInputFormat.INPUT_TABLE));
+ }
+
+ @Test
+ public void testInitTableMapperJob4() throws Exception {
+ Configuration configuration = new Configuration();
+ Job job = new Job(configuration, "tableName");
+ TableMultiRegionMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
+ Import.Importer.class, Text.class, Text.class, job, false);
+ assertEquals(TableMultiRegionInputFormat.class, job.getInputFormatClass());
+ assertEquals(Import.Importer.class, job.getMapperClass());
+ assertEquals(LongWritable.class, job.getOutputKeyClass());
+ assertEquals(Text.class, job.getOutputValueClass());
+ assertNull(job.getCombinerClass());
+ assertEquals("Table", job.getConfiguration().get(TableMultiRegionInputFormat.INPUT_TABLE));
+ }
+}