diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMultiRegionInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMultiRegionInputFormat.java new file mode 100644 index 0000000..6e55521 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMultiRegionInputFormat.java @@ -0,0 +1,219 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.StringUtils; + +import java.io.IOException; + +/** + * Convert HBase tabular data into a format that is consumable by Map/Reduce. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class TableMultiRegionInputFormat extends TableMultiRegionInputFormatBase +implements Configurable { + + private final Log LOG = LogFactory.getLog(TableMultiRegionInputFormat.class); + + /** Job parameter that specifies the input table. */ + public static final String INPUT_TABLE = "hbase.mapreduce.inputtable"; + /** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified. + * See {@link org.apache.hadoop.hbase.mapreduce + * .TableMultiRegionMapReduceUtil#convertScanToString(org.apache.hadoop.hbase.client.Scan)} for + * more details. + */ + public static final String SCAN = "hbase.mapreduce.scan"; + /** Scan start row */ + public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start"; + /** Scan stop row */ + public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop"; + /** Column Family to Scan */ + public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family"; + /** Space delimited list of columns and column families to scan. */ + public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns"; + /** The timestamp used to filter columns with a specific timestamp. */ + public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp"; + /** The starting timestamp used to filter columns with a specific range of versions. */ + public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start"; + /** The ending timestamp used to filter columns with a specific range of versions. */ + public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end"; + /** The maximum number of version to return. */ + public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions"; + /** Set to false to disable server-side caching of blocks for this scan. */ + public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks"; + /** The number of rows for caching that will be passed to scanners. */ + public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows"; + /** Set the maximum number of values to return for each call to next(). */ + public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize"; + + /** The configuration. */ + private Configuration conf = null; + + /** + * Returns the current configuration. + * + * @return The current configuration. + * @see org.apache.hadoop.conf.Configurable#getConf() + */ + @Override + public Configuration getConf() { + return conf; + } + + /** + * Sets the configuration. This is used to set the details for the table to + * be scanned. + * + * @param configuration The configuration to set. + * @see org.apache.hadoop.conf.Configurable#setConf( + * org.apache.hadoop.conf.Configuration) + */ + @Override + public void setConf(Configuration configuration) { + this.conf = configuration; + String tableName = conf.get(INPUT_TABLE); + try { + setHTable(new HTable(new Configuration(conf), tableName)); + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + } + + Scan scan = null; + + if (conf.get(SCAN) != null) { + try { + scan = TableMultiRegionMapReduceUtil.convertStringToScan(conf.get(SCAN)); + } catch (IOException e) { + LOG.error("An error occurred.", e); + } + } else { + try { + scan = new Scan(); + + if (conf.get(SCAN_ROW_START) != null) { + scan.setStartRow(Bytes.toBytes(conf.get(SCAN_ROW_START))); + } + + if (conf.get(SCAN_ROW_STOP) != null) { + scan.setStopRow(Bytes.toBytes(conf.get(SCAN_ROW_STOP))); + } + + if (conf.get(SCAN_COLUMNS) != null) { + addColumns(scan, conf.get(SCAN_COLUMNS)); + } + + if (conf.get(SCAN_COLUMN_FAMILY) != null) { + scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY))); + } + + if (conf.get(SCAN_TIMESTAMP) != null) { + scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP))); + } + + if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) { + scan.setTimeRange( + Long.parseLong(conf.get(SCAN_TIMERANGE_START)), + Long.parseLong(conf.get(SCAN_TIMERANGE_END))); + } + + if (conf.get(SCAN_MAXVERSIONS) != null) { + scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS))); + } + + if (conf.get(SCAN_CACHEDROWS) != null) { + scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS))); + } + + if (conf.get(SCAN_BATCHSIZE) != null) { + scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE))); + } + + // false by default, full table scans generate too much BC churn + scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false))); + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + } + } + + setScan(scan); + } + + /** + * Parses a combined family and qualifier and adds either both or just the + * family in case there is no qualifier. This assumes the older colon + * divided notation, e.g. "family:qualifier". + * + * @param scan The Scan to update. + * @param familyAndQualifier family and qualifier + * @return A reference to this instance. + * @throws IllegalArgumentException When familyAndQualifier is invalid. + */ + private static void addColumn(Scan scan, byte[] familyAndQualifier) { + byte [][] fq = KeyValue.parseColumn(familyAndQualifier); + if (fq.length == 1) { + scan.addFamily(fq[0]); + } else if (fq.length == 2) { + scan.addColumn(fq[0], fq[1]); + } else { + throw new IllegalArgumentException("Invalid familyAndQualifier provided."); + } + } + + /** + * Adds an array of columns specified using old format, family:qualifier. + *

+ * Overrides previous calls to {@link org.apache.hadoop.hbase.client.Scan#addColumn(byte[], + * byte[])}for any families in the + * input. + * + * @param scan The Scan to update. + * @param columns array of columns, formatted as family:qualifier + * @see org.apache.hadoop.hbase.client.Scan#addColumn(byte[], byte[]) + */ + public static void addColumns(Scan scan, byte [][] columns) { + for (byte[] column : columns) { + addColumn(scan, column); + } + } + + /** + * Convenience method to parse a string representation of an array of column specifiers. + * + * @param scan The Scan to update. + * @param columns The columns to parse. + */ + private static void addColumns(Scan scan, String columns) { + String[] cols = columns.split(" "); + for (String col : cols) { + addColumn(scan, Bytes.toBytes(col)); + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMultiRegionInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMultiRegionInputFormatBase.java new file mode 100644 index 0000000..11d24bf --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMultiRegionInputFormatBase.java @@ -0,0 +1,296 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.net.DNS; +import org.apache.hadoop.util.StringUtils; + +import javax.naming.NamingException; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +@InterfaceAudience.Public +@InterfaceStability.Stable +public abstract class TableMultiRegionInputFormatBase + extends InputFormat { + + final Log LOG = LogFactory.getLog(TableMultiRegionInputFormatBase.class); + + /** + * Holds the details for the internal scanner. + */ + private Scan scan = null; + /** + * The table to scan. + */ + private HTable table = null; + /** + * The reader scanning the table, can be a custom one. + */ + private TableRecordReader tableRecordReader = null; + + + /** + * The reverse DNS lookup cache mapping: IPAddress => HostName + */ + private HashMap reverseDNSCacheMap = + new HashMap(); + + /** + * The NameServer address + */ + private String nameServer = null; + + /** + * Builds a TableRecordReader. If no TableRecordReader was provided, uses + * the default. + * + * @param split The split to work with. + * @param context The current context. + * @return The newly created record reader. + * @throws java.io.IOException When creating the reader fails. + * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader( + *org.apache.hadoop.mapreduce.InputSplit, + * org.apache.hadoop.mapreduce.TaskAttemptContext) + */ + @Override + public RecordReader createRecordReader( + InputSplit split, TaskAttemptContext context) + throws IOException { + if (table == null) { + throw new IOException("Cannot create a record reader because of a" + + " previous error. Please look at the previous logs lines from" + + " the task's full log for more details."); + } + TableSplit tSplit = (TableSplit) split; + LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " " + + "bytes."); + TableRecordReader trr = this.tableRecordReader; + // if no table record reader was provided use default + if (trr == null) { + trr = new TableRecordReader(); + } + Scan sc = new Scan(this.scan); + sc.setStartRow(tSplit.getStartRow()); + sc.setStopRow(tSplit.getEndRow()); + trr.setScan(sc); + trr.setHTable(table); + return trr; + } + + /** + * Calculates the splits that will serve as input for the map tasks. The + * number of splits = number of regions in a table/ hbase.mapreduce.scan.regionspermapper . + * + * @param context The current job context. + * @return The list of input splits. + * @throws java.io.IOException When creating the list of splits fails. + * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( + *org.apache.hadoop.mapreduce.JobContext) + */ + @Override + public List getSplits(JobContext context) throws IOException { + if (table == null) { + throw new IOException("No table was provided."); + } + // Get the name server address and the default value is null. + this.nameServer = + context.getConfiguration().get("hbase.nameserver.address", null); + + // Get the number of regions per mapper, the default value is one region for one mapper + String regionPerMapper = context.getConfiguration().get("hbase.mapreduce.scan" + + ".regionspermapper", "1"); + int regionPerMapperInt = 1; + try { + regionPerMapperInt = Integer.parseInt(regionPerMapper); + LOG.debug("Number of regions per mapper: " + regionPerMapperInt); + } catch (NumberFormatException e) { + LOG.error("ERROR when parseInt: hbase.mapreduce.scan.regionspermapper must be an " + + "integer"); + } + RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(table); + + Pair keys = table.getStartEndKeys(); + if (keys == null || keys.getFirst() == null || + keys.getFirst().length == 0) { + HRegionLocation regLoc = table.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); + if (null == regLoc) { + throw new IOException("Expecting at least one region."); + } + List splits = new ArrayList(1); + long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName()); + TableSplit split = new TableSplit(table.getName(), + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc + .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize); + splits.add(split); + return splits; + } + int splitsNumber = keys.getFirst().length; + try { + double splitsNumberDouble = (double) keys.getFirst().length / regionPerMapperInt; + splitsNumber = (int) Math.ceil(splitsNumberDouble); + } catch (NumberFormatException e) { + LOG.error("Number of mappers: " + keys.getFirst().length + " / " + regionPerMapperInt); + } + List splits = new ArrayList(splitsNumber); + for (int i = 0; i * regionPerMapperInt < keys.getFirst().length; i++) { + int startRegion = i * regionPerMapperInt; + int stopRegion = (i * regionPerMapperInt + regionPerMapperInt - 1 < keys.getFirst().length) ? + (i * regionPerMapperInt + regionPerMapperInt - 1) : (keys.getFirst().length - 1); + + ArrayList regionLocationList = new ArrayList(); + ArrayList regionNameList = new ArrayList(); + for (int j = startRegion; j <= stopRegion; j++) { + + HRegionLocation location = table.getRegionLocation(keys.getFirst()[j], false); + // The below InetSocketAddress creation does a name resolution. + InetSocketAddress isa = new InetSocketAddress(location.getHostname(), + location.getPort()); + if (isa.isUnresolved()) { + LOG.warn("Failed resolve " + isa); + } + InetAddress regionAddress = isa.getAddress(); + String regionLocation; + try { + regionLocation = reverseDNS(regionAddress); + } catch (NamingException e) { + LOG.warn("Cannot resolve the host name for " + regionAddress + " because of " + e); + regionLocation = location.getHostname(); + } + regionLocationList.add(regionLocation); + byte[] regionName = location.getRegionInfo().getRegionName(); + regionNameList.add(regionName); + + } + byte[] startRow = scan.getStartRow(); + byte[] stopRow = scan.getStopRow(); + // determine if the given start an stop key fall into the regions + if ((startRow.length == 0 || keys.getSecond()[stopRegion].length == 0 || + Bytes.compareTo(startRow, keys.getSecond()[stopRegion]) < 0) && + (stopRow.length == 0 || + Bytes.compareTo(stopRow, keys.getFirst()[startRegion]) > 0)) { + byte[] splitStart = startRow.length == 0 || + Bytes.compareTo(keys.getFirst()[startRegion], startRow) >= 0 ? + keys.getFirst()[startRegion] : startRow; + byte[] splitStop = (stopRow.length == 0 || + Bytes.compareTo(keys.getSecond()[stopRegion], stopRow) <= 0) && + keys.getSecond()[stopRegion].length > 0 ? + keys.getSecond()[stopRegion] : stopRow; + + long regionSize = 0; + for (int k = 0; k < regionNameList.size(); k++) { + regionSize += sizeCalculator.getRegionSize(regionNameList.get(k)); + } + + //Here use the location of the first region in the List + TableSplit split = new TableSplit(table.getName(), + splitStart, splitStop, regionLocationList.get(0), regionSize); + splits.add(split); + if (LOG.isDebugEnabled()) { + LOG.debug("getSplits: split -> " + i + " -> " + split); + } + } + } + return splits; + } + + private String reverseDNS(InetAddress ipAddress) throws NamingException { + String hostName = this.reverseDNSCacheMap.get(ipAddress); + if (hostName == null) { + hostName = Strings.domainNamePointerToHostName( + DNS.reverseDns(ipAddress, this.nameServer)); + this.reverseDNSCacheMap.put(ipAddress, hostName); + } + return hostName; + } + + /** + * Test if the given region is to be included in the InputSplit while splitting + * the regions of a table. + * + * @param startKey Start key of the region + * @param endKey End key of the region + * @return true, if this region needs to be included as part of the input (default). + */ + protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) { + return true; + } + + /** + * Allows subclasses to get the {@link org.apache.hadoop.hbase.client.HTable}. + */ + protected HTable getHTable() { + return this.table; + } + + /** + * Allows subclasses to set the {@link org.apache.hadoop.hbase.client.HTable}. + * + * @param table The table to get the data from. + */ + protected void setHTable(HTable table) { + this.table = table; + } + + /** + * Gets the scan defining the actual details like columns etc. + * + * @return The internal scan instance. + */ + public Scan getScan() { + if (this.scan == null) this.scan = new Scan(); + return scan; + } + + /** + * Sets the scan defining the actual details like columns etc. + * + * @param scan The scan to set. + */ + public void setScan(Scan scan) { + this.scan = scan; + } + + /** + * Allows subclasses to set the {@link org.apache.hadoop.hbase.mapreduce.TableRecordReader}. + * + * @param tableRecordReader A different {@link org.apache.hadoop.hbase.mapreduce + * .TableRecordReader} + * implementation. + */ + protected void setTableRecordReader(TableRecordReader tableRecordReader) { + this.tableRecordReader = tableRecordReader; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMultiRegionMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMultiRegionMapReduceUtil.java new file mode 100644 index 0000000..7c12378 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMultiRegionMapReduceUtil.java @@ -0,0 +1,961 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.hadoopbackport.JarFinder; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; +import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; +import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.StringUtils; +import org.apache.zookeeper.KeeperException; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLDecoder; +import java.util.*; +import java.util.zip.ZipEntry; +import java.util.zip.ZipFile; + +/** + * Utility for {@link org.apache.hadoop.hbase.mapreduce.TableMapper} and {@link org.apache.hadoop + * .hbase.mapreduce.TableReducer} + */ +@SuppressWarnings({ "rawtypes", "unchecked" }) +@InterfaceAudience.Public +@InterfaceStability.Stable +public class TableMultiRegionMapReduceUtil { + static Log LOG = LogFactory.getLog(TableMultiRegionMapReduceUtil.class); + + /** + * Use this before submitting a TableMap job. It will appropriately set up + * the job. + * + * @param table The table name to read from. + * @param scan The scan instance with the columns, time range etc. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @throws java.io.IOException When setting up the details fails. + */ + public static void initTableMapperJob(String table, Scan scan, + Class mapper, + Class outputKeyClass, + Class outputValueClass, Job job) + throws IOException { + initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, + job, true); + } + + + /** + * Use this before submitting a TableMap job. It will appropriately set up + * the job. + * + * @param table The table name to read from. + * @param scan The scan instance with the columns, time range etc. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @throws java.io.IOException When setting up the details fails. + */ + public static void initTableMapperJob(TableName table, + Scan scan, + Class mapper, + Class outputKeyClass, + Class outputValueClass, + Job job) throws IOException { + initTableMapperJob(table.getNameAsString(), + scan, + mapper, + outputKeyClass, + outputValueClass, + job, + true); + } + + /** + * Use this before submitting a TableMap job. It will appropriately set up + * the job. + * + * @param table Binary representation of the table name to read from. + * @param scan The scan instance with the columns, time range etc. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @throws java.io.IOException When setting up the details fails. + */ + public static void initTableMapperJob(byte[] table, Scan scan, + Class mapper, + Class outputKeyClass, + Class outputValueClass, Job job) + throws IOException { + initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, + job, true); + } + + /** + * Use this before submitting a TableMap job. It will appropriately set up + * the job. + * + * @param table The table name to read from. + * @param scan The scan instance with the columns, time range etc. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + * @throws java.io.IOException When setting up the details fails. + */ + public static void initTableMapperJob(String table, Scan scan, + Class mapper, + Class outputKeyClass, + Class outputValueClass, Job job, + boolean addDependencyJars, Class inputFormatClass) + throws IOException { + initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job, + addDependencyJars, true, inputFormatClass); + } + + + /** + * Use this before submitting a TableMap job. It will appropriately set up + * the job. + * + * @param table The table name to read from. + * @param scan The scan instance with the columns, time range etc. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + * @param initCredentials whether to initialize hbase auth credentials for the job + * @param inputFormatClass the input format + * @throws java.io.IOException When setting up the details fails. + */ + public static void initTableMapperJob(String table, Scan scan, + Class mapper, + Class outputKeyClass, + Class outputValueClass, Job job, + boolean addDependencyJars, boolean initCredentials, + Class inputFormatClass) + throws IOException { + job.setInputFormatClass(inputFormatClass); + if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass); + if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass); + job.setMapperClass(mapper); + if (Put.class.equals(outputValueClass)) { + job.setCombinerClass(PutCombiner.class); + } + Configuration conf = job.getConfiguration(); + HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); + conf.set(TableMultiRegionInputFormat.INPUT_TABLE, table); + conf.set(TableMultiRegionInputFormat.SCAN, convertScanToString(scan)); + conf.setStrings("io.serializations", conf.get("io.serializations"), + MutationSerialization.class.getName(), ResultSerialization.class.getName(), + KeyValueSerialization.class.getName()); + if (addDependencyJars) { + addDependencyJars(job); + } + if (initCredentials) { + initCredentials(job); + } + } + + /** + * Use this before submitting a TableMap job. It will appropriately set up + * the job. + * + * @param table Binary representation of the table name to read from. + * @param scan The scan instance with the columns, time range etc. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + * @param inputFormatClass The class of the input format + * @throws java.io.IOException When setting up the details fails. + */ + public static void initTableMapperJob(byte[] table, Scan scan, + Class mapper, + Class outputKeyClass, + Class outputValueClass, Job job, + boolean addDependencyJars, Class inputFormatClass) + throws IOException { + initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, + outputValueClass, job, addDependencyJars, inputFormatClass); + } + + /** + * Use this before submitting a TableMap job. It will appropriately set up + * the job. + * + * @param table Binary representation of the table name to read from. + * @param scan The scan instance with the columns, time range etc. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + * @throws java.io.IOException When setting up the details fails. + */ + public static void initTableMapperJob(byte[] table, Scan scan, + Class mapper, + Class outputKeyClass, + Class outputValueClass, Job job, + boolean addDependencyJars) + throws IOException { + initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, + outputValueClass, job, addDependencyJars, TableMultiRegionInputFormat.class); + } + + /** + * Use this before submitting a TableMap job. It will appropriately set up + * the job. + * + * @param table The table name to read from. + * @param scan The scan instance with the columns, time range etc. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + * @throws java.io.IOException When setting up the details fails. + */ + public static void initTableMapperJob(String table, Scan scan, + Class mapper, + Class outputKeyClass, + Class outputValueClass, Job job, + boolean addDependencyJars) + throws IOException { + initTableMapperJob(table, scan, mapper, outputKeyClass, + outputValueClass, job, addDependencyJars, TableMultiRegionInputFormat.class); + } + + /** + * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on + * direct memory will likely cause the map tasks to OOM when opening the region. This + * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user + * wants to override this behavior in their job. + */ + public static void resetCacheConfig(Configuration conf) { + conf.setFloat( + HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); + conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f); + conf.setFloat("hbase.bucketcache.size", 0f); + } + + /** + * Sets up the job for reading from a table snapshot. It bypasses hbase servers + * and read directly from snapshot files. + * + * @param snapshotName The name of the snapshot (of a table) to read from. + * @param scan The scan instance with the columns, time range etc. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + * + * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should + * have write permissions to this directory, and this should not be a subdirectory of rootdir. + * After the job is finished, restore directory can be deleted. + * @throws java.io.IOException When setting up the details fails. + * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat + */ + public static void initTableSnapshotMapperJob(String snapshotName, Scan scan, + Class mapper, + Class outputKeyClass, + Class outputValueClass, Job job, + boolean addDependencyJars, Path tmpRestoreDir) + throws IOException { + TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir); + initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, + outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class); + resetCacheConfig(job.getConfiguration()); + } + + /** + * Use this before submitting a Multi TableMap job. It will appropriately set + * up the job. + * + * @param scans The list of {@link org.apache.hadoop.hbase.client.Scan} objects to read from. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is carrying + * all necessary HBase configuration. + * @throws java.io.IOException When setting up the details fails. + */ + public static void initTableMapperJob(List scans, + Class mapper, + Class outputKeyClass, + Class outputValueClass, Job job) throws IOException { + initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job, + true); + } + + /** + * Use this before submitting a Multi TableMap job. It will appropriately set + * up the job. + * + * @param scans The list of {@link org.apache.hadoop.hbase.client.Scan} objects to read from. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is carrying + * all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the + * configured job classes via the distributed cache (tmpjars). + * @throws java.io.IOException When setting up the details fails. + */ + public static void initTableMapperJob(List scans, + Class mapper, + Class outputKeyClass, + Class outputValueClass, Job job, + boolean addDependencyJars) throws IOException { + initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job, + addDependencyJars, true); + } + + /** + * Use this before submitting a Multi TableMap job. It will appropriately set + * up the job. + * + * @param scans The list of {@link org.apache.hadoop.hbase.client.Scan} objects to read from. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is carrying + * all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the + * configured job classes via the distributed cache (tmpjars). + * @param initCredentials whether to initialize hbase auth credentials for the job + * @throws java.io.IOException When setting up the details fails. + */ + public static void initTableMapperJob(List scans, + Class mapper, + Class outputKeyClass, + Class outputValueClass, Job job, + boolean addDependencyJars, + boolean initCredentials) throws IOException { + job.setInputFormatClass(MultiTableInputFormat.class); + if (outputValueClass != null) { + job.setMapOutputValueClass(outputValueClass); + } + if (outputKeyClass != null) { + job.setMapOutputKeyClass(outputKeyClass); + } + job.setMapperClass(mapper); + Configuration conf = job.getConfiguration(); + HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); + List scanStrings = new ArrayList(); + + for (Scan scan : scans) { + scanStrings.add(convertScanToString(scan)); + } + job.getConfiguration().setStrings(MultiTableInputFormat.SCANS, + scanStrings.toArray(new String[scanStrings.size()])); + + if (addDependencyJars) { + addDependencyJars(job); + } + + if (initCredentials) { + initCredentials(job); + } + } + + public static void initCredentials(Job job) throws IOException { + UserProvider userProvider = UserProvider.instantiate(job.getConfiguration()); + if (userProvider.isHadoopSecurityEnabled()) { + // propagate delegation related props from launcher job to MR job + if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { + job.getConfiguration().set("mapreduce.job.credentials.binary", + System.getenv("HADOOP_TOKEN_FILE_LOCATION")); + } + } + + if (userProvider.isHBaseSecurityEnabled()) { + try { + // init credentials for remote cluster + String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS); + User user = userProvider.getCurrent(); + if (quorumAddress != null) { + Configuration peerConf = HBaseConfiguration.create(job.getConfiguration()); + ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress); + obtainAuthTokenForJob(job, peerConf, user); + } + + obtainAuthTokenForJob(job, job.getConfiguration(), user); + } catch (InterruptedException ie) { + LOG.info("Interrupted obtaining user authentication token"); + Thread.currentThread().interrupt(); + } + } + } + + /** + * Obtain an authentication token, for the specified cluster, on behalf of the current user + * and add it to the credentials for the given map reduce job. + * + * The quorumAddress is the key to the ZK ensemble, which contains: + * hbase.zookeeper.quorum, hbase.zookeeper.client.port and zookeeper.znode.parent + * + * @param job The job that requires the permission. + * @param quorumAddress string that contains the 3 required configuratins + * @throws java.io.IOException When the authentication token cannot be obtained. + */ + public static void initCredentialsForCluster(Job job, String quorumAddress) + throws IOException { + UserProvider userProvider = UserProvider.instantiate(job.getConfiguration()); + if (userProvider.isHBaseSecurityEnabled()) { + try { + Configuration peerConf = HBaseConfiguration.create(job.getConfiguration()); + ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress); + obtainAuthTokenForJob(job, peerConf, userProvider.getCurrent()); + } catch (InterruptedException e) { + LOG.info("Interrupted obtaining user authentication token"); + Thread.interrupted(); + } + } + } + + private static void obtainAuthTokenForJob(Job job, Configuration conf, User user) + throws IOException, InterruptedException { + Token authToken = getAuthToken(conf, user); + if (authToken == null) { + user.obtainAuthTokenForJob(conf, job); + } else { + job.getCredentials().addToken(authToken.getService(), authToken); + } + } + + /** + * Get the authentication token of the user for the cluster specified in the configuration + * @return null if the user does not have the token, otherwise the auth token for the cluster. + */ + private static Token getAuthToken(Configuration conf, User user) + throws IOException, InterruptedException { + ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "mr-init-credentials", null); + try { + String clusterId = ZKClusterId.readClusterIdZNode(zkw); + return new AuthenticationTokenSelector().selectToken(new Text(clusterId), + user.getUGI().getTokens()); + } catch (KeeperException e) { + throw new IOException(e); + } finally { + zkw.close(); + } + } + + /** + * Writes the given scan into a Base64 encoded string. + * + * @param scan The scan to write out. + * @return The scan saved in a Base64 encoded string. + * @throws java.io.IOException When writing the scan fails. + */ + static String convertScanToString(Scan scan) throws IOException { + ClientProtos.Scan proto = ProtobufUtil.toScan(scan); + return Base64.encodeBytes(proto.toByteArray()); + } + + /** + * Converts the given Base64 string back into a Scan instance. + * + * @param base64 The scan details. + * @return The newly created Scan instance. + * @throws java.io.IOException When reading the scan instance fails. + */ + static Scan convertStringToScan(String base64) throws IOException { + byte [] decoded = Base64.decode(base64); + ClientProtos.Scan scan; + try { + scan = ClientProtos.Scan.parseFrom(decoded); + } catch (InvalidProtocolBufferException ipbe) { + throw new IOException(ipbe); + } + + return ProtobufUtil.toScan(scan); + } + + /** + * Use this before submitting a TableReduce job. It will + * appropriately set up the JobConf. + * + * @param table The output table. + * @param reducer The reducer class to use. + * @param job The current job to adjust. + * @throws java.io.IOException When determining the region count fails. + */ + public static void initTableReducerJob(String table, + Class reducer, Job job) + throws IOException { + initTableReducerJob(table, reducer, job, null); + } + + /** + * Use this before submitting a TableReduce job. It will + * appropriately set up the JobConf. + * + * @param table The output table. + * @param reducer The reducer class to use. + * @param job The current job to adjust. + * @param partitioner Partitioner to use. Pass null to use + * default partitioner. + * @throws java.io.IOException When determining the region count fails. + */ + public static void initTableReducerJob(String table, + Class reducer, Job job, + Class partitioner) throws IOException { + initTableReducerJob(table, reducer, job, partitioner, null, null, null); + } + + /** + * Use this before submitting a TableReduce job. It will + * appropriately set up the JobConf. + * + * @param table The output table. + * @param reducer The reducer class to use. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param partitioner Partitioner to use. Pass null to use + * default partitioner. + * @param quorumAddress Distant cluster to write to; default is null for + * output to the cluster that is designated in hbase-site.xml. + * Set this String to the zookeeper ensemble of an alternate remote cluster + * when you would have the reduce write a cluster that is other than the + * default; e.g. copying tables between clusters, the source would be + * designated by hbase-site.xml and this param would have the + * ensemble address of the remote cluster. The format to pass is particular. + * Pass <hbase.zookeeper.quorum>:<hbase.zookeeper.client.port>:<zookeeper.znode + * .parent> + * such as server,server2,server3:2181:/hbase. + * @param serverClass redefined hbase.regionserver.class + * @param serverImpl redefined hbase.regionserver.impl + * @throws java.io.IOException When determining the region count fails. + */ + public static void initTableReducerJob(String table, + Class reducer, Job job, + Class partitioner, String quorumAddress, String serverClass, + String serverImpl) throws IOException { + initTableReducerJob(table, reducer, job, partitioner, quorumAddress, + serverClass, serverImpl, true); + } + + /** + * Use this before submitting a TableReduce job. It will + * appropriately set up the JobConf. + * + * @param table The output table. + * @param reducer The reducer class to use. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param partitioner Partitioner to use. Pass null to use + * default partitioner. + * @param quorumAddress Distant cluster to write to; default is null for + * output to the cluster that is designated in hbase-site.xml. + * Set this String to the zookeeper ensemble of an alternate remote cluster + * when you would have the reduce write a cluster that is other than the + * default; e.g. copying tables between clusters, the source would be + * designated by hbase-site.xml and this param would have the + * ensemble address of the remote cluster. The format to pass is particular. + * Pass <hbase.zookeeper.quorum>:<hbase.zookeeper.client.port>:<zookeeper.znode + * .parent> + * such as server,server2,server3:2181:/hbase. + * @param serverClass redefined hbase.regionserver.class + * @param serverImpl redefined hbase.regionserver.impl + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + * @throws java.io.IOException When determining the region count fails. + */ + public static void initTableReducerJob(String table, + Class reducer, Job job, + Class partitioner, String quorumAddress, String serverClass, + String serverImpl, boolean addDependencyJars) throws IOException { + + Configuration conf = job.getConfiguration(); + HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); + job.setOutputFormatClass(TableOutputFormat.class); + if (reducer != null) job.setReducerClass(reducer); + conf.set(TableOutputFormat.OUTPUT_TABLE, table); + conf.setStrings("io.serializations", conf.get("io.serializations"), + MutationSerialization.class.getName(), ResultSerialization.class.getName()); + // If passed a quorum/ensemble address, pass it on to TableOutputFormat. + if (quorumAddress != null) { + // Calling this will validate the format + ZKUtil.transformClusterKey(quorumAddress); + conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress); + } + if (serverClass != null && serverImpl != null) { + conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass); + conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl); + } + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(Writable.class); + if (partitioner == HRegionPartitioner.class) { + job.setPartitionerClass(HRegionPartitioner.class); + int regions = MetaTableAccessor.getRegionCount(conf, table); + if (job.getNumReduceTasks() > regions) { + job.setNumReduceTasks(regions); + } + } else if (partitioner != null) { + job.setPartitionerClass(partitioner); + } + + if (addDependencyJars) { + addDependencyJars(job); + } + + initCredentials(job); + } + + /** + * Ensures that the given number of reduce tasks for the given job + * configuration does not exceed the number of regions for the given table. + * + * @param table The table to get the region count for. + * @param job The current job to adjust. + * @throws java.io.IOException When retrieving the table details fails. + */ + public static void limitNumReduceTasks(String table, Job job) + throws IOException { + int regions = MetaTableAccessor.getRegionCount(job.getConfiguration(), table); + if (job.getNumReduceTasks() > regions) + job.setNumReduceTasks(regions); + } + + /** + * Sets the number of reduce tasks for the given job configuration to the + * number of regions the given table has. + * + * @param table The table to get the region count for. + * @param job The current job to adjust. + * @throws java.io.IOException When retrieving the table details fails. + */ + public static void setNumReduceTasks(String table, Job job) + throws IOException { + job.setNumReduceTasks(MetaTableAccessor.getRegionCount(job.getConfiguration(), table)); + } + + /** + * Sets the number of rows to return and cache with each scanner iteration. + * Higher caching values will enable faster mapreduce jobs at the expense of + * requiring more heap to contain the cached rows. + * + * @param job The current job to adjust. + * @param batchSize The number of rows to return in batch with each scanner + * iteration. + */ + public static void setScannerCaching(Job job, int batchSize) { + job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize); + } + + /** + * Add HBase and its dependencies (only) to the job configuration. + *

+ * This is intended as a low-level API, facilitating code reuse between this + * class and its mapred counterpart. It also of use to extenral tools that + * need to build a MapReduce job that interacts with HBase but want + * fine-grained control over the jars shipped to the cluster. + *

+ * @param conf The Configuration object to extend with dependencies. + * @see org.apache.hadoop.hbase.mapred.TableMapReduceUtil + * @see PIG-3285 + */ + public static void addHBaseDependencyJars(Configuration conf) throws IOException { + addDependencyJars(conf, + // explicitly pull a class from each module + HConstants.class, // hbase-common + ClientProtos.class, // hbase-protocol + Put.class, // hbase-client + org.apache.hadoop.hbase.CompatibilityFactory.class, // hbase-hadoop-compat + TableMapper.class, // hbase-server + // pull necessary dependencies + org.apache.zookeeper.ZooKeeper.class, + io.netty.channel.Channel.class, + com.google.protobuf.Message.class, + com.google.common.collect.Lists.class, + org.htrace.Trace.class); + } + + /** + * Returns a classpath string built from the content of the "tmpjars" value in {@code conf}. + * Also exposed to shell scripts via `bin/hbase mapredcp`. + */ + public static String buildDependencyClasspath(Configuration conf) { + if (conf == null) { + throw new IllegalArgumentException("Must provide a configuration object."); + } + Set paths = new HashSet(conf.getStringCollection("tmpjars")); + if (paths.size() == 0) { + throw new IllegalArgumentException("Configuration contains no tmpjars."); + } + StringBuilder sb = new StringBuilder(); + for (String s : paths) { + // entries can take the form 'file:/path/to/file.jar'. + int idx = s.indexOf(":"); + if (idx != -1) s = s.substring(idx + 1); + if (sb.length() > 0) sb.append(File.pathSeparator); + sb.append(s); + } + return sb.toString(); + } + + /** + * Add the HBase dependency jars as well as jars for any of the configured + * job classes to the job configuration, so that JobClient will ship them + * to the cluster and add them to the DistributedCache. + */ + public static void addDependencyJars(Job job) throws IOException { + addHBaseDependencyJars(job.getConfiguration()); + try { + addDependencyJars(job.getConfiguration(), + // when making changes here, consider also mapred.TableMultiRegionMapReduceUtil + // pull job classes + job.getMapOutputKeyClass(), + job.getMapOutputValueClass(), + job.getInputFormatClass(), + job.getOutputKeyClass(), + job.getOutputValueClass(), + job.getOutputFormatClass(), + job.getPartitionerClass(), + job.getCombinerClass()); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + } + + /** + * Add the jars containing the given classes to the job's configuration + * such that JobClient will ship them to the cluster and add them to + * the DistributedCache. + */ + public static void addDependencyJars(Configuration conf, + Class... classes) throws IOException { + + FileSystem localFs = FileSystem.getLocal(conf); + Set jars = new HashSet(); + // Add jars that are already in the tmpjars variable + jars.addAll(conf.getStringCollection("tmpjars")); + + // add jars as we find them to a map of contents jar name so that we can avoid + // creating new jars for classes that have already been packaged. + Map packagedClasses = new HashMap(); + + // Add jars containing the specified classes + for (Class clazz : classes) { + if (clazz == null) continue; + + Path path = findOrCreateJar(clazz, localFs, packagedClasses); + if (path == null) { + LOG.warn("Could not find jar for class " + clazz + + " in order to ship it to the cluster."); + continue; + } + if (!localFs.exists(path)) { + LOG.warn("Could not validate jar file " + path + " for class " + + clazz); + continue; + } + jars.add(path.toString()); + } + if (jars.isEmpty()) return; + + conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()]))); + } + + /** + * If org.apache.hadoop.util.JarFinder is available (0.23+ hadoop), finds + * the Jar for a class or creates it if it doesn't exist. If the class is in + * a directory in the classpath, it creates a Jar on the fly with the + * contents of the directory and returns the path to that Jar. If a Jar is + * created, it is created in the system temporary directory. Otherwise, + * returns an existing jar that contains a class of the same name. Maintains + * a mapping from jar contents to the tmp jar created. + * @param my_class the class to find. + * @param fs the FileSystem with which to qualify the returned path. + * @param packagedClasses a map of class name to path. + * @return a jar file that contains the class. + * @throws java.io.IOException + */ + private static Path findOrCreateJar(Class my_class, FileSystem fs, + Map packagedClasses) + throws IOException { + // attempt to locate an existing jar for the class. + String jar = findContainingJar(my_class, packagedClasses); + if (null == jar || jar.isEmpty()) { + jar = getJar(my_class); + updateMap(jar, packagedClasses); + } + + if (null == jar || jar.isEmpty()) { + return null; + } + + LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar)); + return new Path(jar).makeQualified(fs); + } + + /** + * Add entries to packagedClasses corresponding to class files + * contained in jar. + * @param jar The jar who's content to list. + * @param packagedClasses map[class -> jar] + */ + private static void updateMap(String jar, Map packagedClasses) throws + IOException { + if (null == jar || jar.isEmpty()) { + return; + } + ZipFile zip = null; + try { + zip = new ZipFile(jar); + for (Enumeration iter = zip.entries(); iter.hasMoreElements();) { + ZipEntry entry = iter.nextElement(); + if (entry.getName().endsWith("class")) { + packagedClasses.put(entry.getName(), jar); + } + } + } finally { + if (null != zip) zip.close(); + } + } + + /** + * Find a jar that contains a class of the same name, if any. It will return + * a jar file, even if that is not the first thing on the class path that + * has a class with the same name. Looks first on the classpath and then in + * the packagedClasses map. + * @param my_class the class to find. + * @return a jar file that contains the class, or null. + * @throws java.io.IOException + */ + private static String findContainingJar(Class my_class, Map packagedClasses) + throws IOException { + ClassLoader loader = my_class.getClassLoader(); + String class_file = my_class.getName().replaceAll("\\.", "/") + ".class"; + + // first search the classpath + for (Enumeration itr = loader.getResources(class_file); itr.hasMoreElements();) { + URL url = itr.nextElement(); + if ("jar".equals(url.getProtocol())) { + String toReturn = url.getPath(); + if (toReturn.startsWith("file:")) { + toReturn = toReturn.substring("file:".length()); + } + // URLDecoder is a misnamed class, since it actually decodes + // x-www-form-urlencoded MIME type rather than actual + // URL encoding (which the file path has). Therefore it would + // decode +s to ' 's which is incorrect (spaces are actually + // either unencoded or encoded as "%20"). Replace +s first, so + // that they are kept sacred during the decoding process. + toReturn = toReturn.replaceAll("\\+", "%2B"); + toReturn = URLDecoder.decode(toReturn, "UTF-8"); + return toReturn.replaceAll("!.*$", ""); + } + } + + // now look in any jars we've packaged using JarFinder. Returns null when + // no jar is found. + return packagedClasses.get(class_file); + } + + /** + * Invoke 'getJar' on a JarFinder implementation. Useful for some job + * configuration contexts (HBASE-8140) and also for testing on MRv2. First + * check if we have HADOOP-9426. Lacking that, fall back to the backport. + * @param my_class the class to find. + * @return a jar file that contains the class, or null. + */ + private static String getJar(Class my_class) { + String ret = null; + String hadoopJarFinder = "org.apache.hadoop.util.JarFinder"; + Class jarFinder = null; + try { + LOG.debug("Looking for " + hadoopJarFinder + "."); + jarFinder = Class.forName(hadoopJarFinder); + LOG.debug(hadoopJarFinder + " found."); + Method getJar = jarFinder.getMethod("getJar", Class.class); + ret = (String) getJar.invoke(null, my_class); + } catch (ClassNotFoundException e) { + LOG.debug("Using backported JarFinder."); + ret = JarFinder.getJar(my_class); + } catch (InvocationTargetException e) { + // function was properly called, but threw it's own exception. Unwrap it + // and pass it on. + throw new RuntimeException(e.getCause()); + } catch (Exception e) { + // toss all other exceptions, related to reflection failure + throw new RuntimeException("getJar invocation failed.", e); + } + + return ret; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionInputFormatScan1.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionInputFormatScan1.java new file mode 100644 index 0000000..a6ffffb --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionInputFormatScan1.java @@ -0,0 +1,100 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; + +/** + * TestTableMultiRegionInputFormatScan part 1. + * @see org.apache.hadoop.hbase.mapreduce.TestTableMultiRegionInputFormatScanBase + */ +@Category({VerySlowMapReduceTests.class, LargeTests.class}) +public class TestTableMultiRegionInputFormatScan1 extends TestTableMultiRegionInputFormatScanBase { + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws java.io.IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToEmpty() + throws IOException, InterruptedException, ClassNotFoundException { + testScan(null, null, null); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws java.io.IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToAPP() + throws IOException, InterruptedException, ClassNotFoundException { + testScan(null, "app", "apo"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws java.io.IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToBBA() + throws IOException, InterruptedException, ClassNotFoundException { + testScan(null, "bba", "baz"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws java.io.IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToBBB() + throws IOException, InterruptedException, ClassNotFoundException { + testScan(null, "bbb", "bba"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws java.io.IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToOPP() + throws IOException, InterruptedException, ClassNotFoundException { + testScan(null, "opp", "opo"); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionInputFormatScan2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionInputFormatScan2.java new file mode 100644 index 0000000..92fb67e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionInputFormatScan2.java @@ -0,0 +1,118 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * TestTableMultiRegionInputFormatScan part 2. + * @see org.apache.hadoop.hbase.mapreduce.TestTableMultiRegionInputFormatScanBase + */ +@Category({VerySlowMapReduceTests.class, LargeTests.class}) +public class TestTableMultiRegionInputFormatScan2 extends TestTableMultiRegionInputFormatScanBase { + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws java.io.IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanOBBToOPP() + throws IOException, InterruptedException, ClassNotFoundException { + testScan("obb", "opp", "opo"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws java.io.IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanOBBToQPP() + throws IOException, InterruptedException, ClassNotFoundException { + testScan("obb", "qpp", "qpo"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws java.io.IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanOPPToEmpty() + throws IOException, InterruptedException, ClassNotFoundException { + testScan("opp", null, "zzz"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws java.io.IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanYYXToEmpty() + throws IOException, InterruptedException, ClassNotFoundException { + testScan("yyx", null, "zzz"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws java.io.IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanYYYToEmpty() + throws IOException, InterruptedException, ClassNotFoundException { + testScan("yyy", null, "zzz"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws java.io.IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanYZYToEmpty() + throws IOException, InterruptedException, ClassNotFoundException { + testScan("yzy", null, "zzz"); + } + + @Test + public void testScanFromConfiguration() + throws IOException, InterruptedException, ClassNotFoundException { + testScanFromConfiguration("bba", "bbd", "bbc"); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionInputFormatScanBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionInputFormatScanBase.java new file mode 100644 index 0000000..6a4d30d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionInputFormatScanBase.java @@ -0,0 +1,242 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + *

+ * Tests various scan start and stop row scenarios. This is set in a scan and + * tested in a MapReduce job to see if that is handed over and done properly + * too. + *

+ *

+ * This test is copied from another test:TestRegionInputFormatScanBase + *

+ */ +public abstract class TestTableMultiRegionInputFormatScanBase { + + static final Log LOG = LogFactory.getLog(TestTableMultiRegionInputFormatScanBase.class); + static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + static final byte[] TABLE_NAME = Bytes.toBytes("scantest"); + static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); + static final String KEY_STARTROW = "startRow"; + static final String KEY_LASTROW = "stpRow"; + + private static HTable table = null; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // test intermittently fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on. + // this turns it off for this test. TODO: Figure out why scr breaks recovery. + System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); + + // switch TIF to log at DEBUG level + TEST_UTIL.enableDebug(TableMultiRegionInputFormat.class); + TEST_UTIL.enableDebug(TableMultiRegionInputFormatBase.class); + // start mini hbase cluster + TEST_UTIL.startMiniCluster(3); + // create and fill table + table = TEST_UTIL.createTable(TABLE_NAME, INPUT_FAMILY); + TEST_UTIL.createMultiRegions(table, INPUT_FAMILY); + TEST_UTIL.loadTable(table, INPUT_FAMILY, false); + // start MR cluster + TEST_UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniMapReduceCluster(); + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Pass the key and value to reduce. + */ + public static class ScanMapper + extends TableMapper { + + /** + * Pass the key and value to reduce. + * + * @param key The key, here "aaa", "aab" etc. + * @param value The value is the same as the key. + * @param context The task context. + * @throws java.io.IOException When reading the rows fails. + */ + @Override + public void map(ImmutableBytesWritable key, Result value, + Context context) + throws IOException, InterruptedException { + if (value.size() != 1) { + throw new IOException("There should only be one input column"); + } + Map>> + cf = value.getMap(); + if(!cf.containsKey(INPUT_FAMILY)) { + throw new IOException("Wrong input columns. Missing: '" + + Bytes.toString(INPUT_FAMILY) + "'."); + } + String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null)); + LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) + + ", value -> " + val); + context.write(key, key); + } + + } + + /** + * Checks the last and first key seen against the scanner boundaries. + */ + public static class ScanReducer + extends Reducer { + + private String first = null; + private String last = null; + + protected void reduce(ImmutableBytesWritable key, + Iterable values, Context context) + throws IOException ,InterruptedException { + int count = 0; + for (ImmutableBytesWritable value : values) { + String val = Bytes.toStringBinary(value.get()); + LOG.info("reduce: key[" + count + "] -> " + + Bytes.toStringBinary(key.get()) + ", value -> " + val); + if (first == null) first = val; + last = val; + count++; + } + } + + protected void cleanup(Context context) + throws IOException, InterruptedException { + Configuration c = context.getConfiguration(); + String startRow = c.get(KEY_STARTROW); + String lastRow = c.get(KEY_LASTROW); + LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\""); + LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\""); + if (startRow != null && startRow.length() > 0) { + assertEquals(startRow, first); + } + if (lastRow != null && lastRow.length() > 0) { + assertEquals(lastRow, last); + } + } + + } + + /** + * Tests an MR Scan initialized from properties set in the Configuration. + * + * @throws java.io.IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + protected void testScanFromConfiguration(String start, String stop, String last) + throws IOException, InterruptedException, ClassNotFoundException { + String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase() : "Empty") + + "To" + (stop != null ? stop.toUpperCase() : "Empty"); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + c.set(TableMultiRegionInputFormat.INPUT_TABLE, Bytes.toString(TABLE_NAME)); + c.set(TableMultiRegionInputFormat.SCAN_COLUMN_FAMILY, Bytes.toString(INPUT_FAMILY)); + c.set(KEY_STARTROW, start != null ? start : ""); + c.set(KEY_LASTROW, last != null ? last : ""); + + if (start != null) { + c.set(TableMultiRegionInputFormat.SCAN_ROW_START, start); + } + + if (stop != null) { + c.set(TableMultiRegionInputFormat.SCAN_ROW_STOP, stop); + } + + Job job = new Job(c, jobName); + job.setMapperClass(ScanMapper.class); + job.setReducerClass(ScanReducer.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(ImmutableBytesWritable.class); + job.setInputFormatClass(TableMultiRegionInputFormat.class); + job.setNumReduceTasks(1); + FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); + TableMultiRegionMapReduceUtil.addDependencyJars(job); + assertTrue(job.waitForCompletion(true)); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws java.io.IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + protected void testScan(String start, String stop, String last) + throws IOException, InterruptedException, ClassNotFoundException { + String jobName = "Scan" + (start != null ? start.toUpperCase() : "Empty") + + "To" + (stop != null ? stop.toUpperCase() : "Empty"); + LOG.info("Before map/reduce startup - job " + jobName); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + Scan scan = new Scan(); + scan.addFamily(INPUT_FAMILY); + if (start != null) { + scan.setStartRow(Bytes.toBytes(start)); + } + c.set(KEY_STARTROW, start != null ? start : ""); + if (stop != null) { + scan.setStopRow(Bytes.toBytes(stop)); + } + c.set(KEY_LASTROW, last != null ? last : ""); + LOG.info("scan before: " + scan); + Job job = new Job(c, jobName); + TableMultiRegionMapReduceUtil.initTableMapperJob( + Bytes.toString(TABLE_NAME), scan, ScanMapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); + job.setReducerClass(ScanReducer.class); + job.setNumReduceTasks(1); // one to get final "first" and "last" key + FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); + LOG.info("Started " + job.getJobName()); + assertTrue(job.waitForCompletion(true)); + LOG.info("After map/reduce completion - job " + jobName); + } + +} + diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionMapReduceUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionMapReduceUtil.java new file mode 100644 index 0000000..0a4f651 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMultiRegionMapReduceUtil.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** + * Test different variants of initTableMapperJob method + */ +@Category({MapReduceTests.class, SmallTests.class}) +public class TestTableMultiRegionMapReduceUtil { + + /* + * initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because + * the method depends on an online cluster. + */ + + @Test + public void testInitTableMapperJob1() throws Exception { + Configuration configuration = new Configuration(); + Job job = new Job(configuration, "tableName"); + // test + TableMultiRegionMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class, + Text.class, job, false, HLogInputFormat.class); + assertEquals(HLogInputFormat.class, job.getInputFormatClass()); + assertEquals(Import.Importer.class, job.getMapperClass()); + assertEquals(LongWritable.class, job.getOutputKeyClass()); + assertEquals(Text.class, job.getOutputValueClass()); + assertNull(job.getCombinerClass()); + assertEquals("Table", job.getConfiguration().get(TableMultiRegionInputFormat.INPUT_TABLE)); + } + + @Test + public void testInitTableMapperJob2() throws Exception { + Configuration configuration = new Configuration(); + Job job = new Job(configuration, "tableName"); + TableMultiRegionMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), + Import.Importer.class, Text.class, Text.class, job, false, HLogInputFormat.class); + assertEquals(HLogInputFormat.class, job.getInputFormatClass()); + assertEquals(Import.Importer.class, job.getMapperClass()); + assertEquals(LongWritable.class, job.getOutputKeyClass()); + assertEquals(Text.class, job.getOutputValueClass()); + assertNull(job.getCombinerClass()); + assertEquals("Table", job.getConfiguration().get(TableMultiRegionInputFormat.INPUT_TABLE)); + } + + @Test + public void testInitTableMapperJob3() throws Exception { + Configuration configuration = new Configuration(); + Job job = new Job(configuration, "tableName"); + TableMultiRegionMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), + Import.Importer.class, Text.class, Text.class, job); + assertEquals(TableMultiRegionInputFormat.class, job.getInputFormatClass()); + assertEquals(Import.Importer.class, job.getMapperClass()); + assertEquals(LongWritable.class, job.getOutputKeyClass()); + assertEquals(Text.class, job.getOutputValueClass()); + assertNull(job.getCombinerClass()); + assertEquals("Table", job.getConfiguration().get(TableMultiRegionInputFormat.INPUT_TABLE)); + } + + @Test + public void testInitTableMapperJob4() throws Exception { + Configuration configuration = new Configuration(); + Job job = new Job(configuration, "tableName"); + TableMultiRegionMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), + Import.Importer.class, Text.class, Text.class, job, false); + assertEquals(TableMultiRegionInputFormat.class, job.getInputFormatClass()); + assertEquals(Import.Importer.class, job.getMapperClass()); + assertEquals(LongWritable.class, job.getOutputKeyClass()); + assertEquals(Text.class, job.getOutputValueClass()); + assertNull(job.getCombinerClass()); + assertEquals("Table", job.getConfiguration().get(TableMultiRegionInputFormat.INPUT_TABLE)); + } +}