Index: src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java (revision 0) @@ -0,0 +1,288 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Tests various scan start and stop row scenarios. This is set in a scan and + * tested in a MapReduce job to see if that is handed over and done properly + * too. + */ +@Category(LargeTests.class) +public class TestMultiTableInputFormat { + + static final Log LOG = LogFactory.getLog(TestMultiTableInputFormat.class); + static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + static final String TABLE_NAME = "scantest"; + static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); + static final String KEY_STARTROW = "startRow"; + static final String KEY_LASTROW = "stpRow"; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // switch TIF to log at DEBUG level + TEST_UTIL.enableDebug(MultiTableInputFormat.class); + TEST_UTIL.enableDebug(MultiTableInputFormatBase.class); + // start mini hbase cluster + TEST_UTIL.startMiniCluster(3); + // create and fill table + for (int i = 1; i <= 3; i++) { + HTable table = + TEST_UTIL.createTable(Bytes.toBytes(TABLE_NAME + String.valueOf(i)), + INPUT_FAMILY); + TEST_UTIL.createMultiRegions(table, INPUT_FAMILY); + TEST_UTIL.loadTable(table, INPUT_FAMILY); + } + // start MR cluster + TEST_UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniMapReduceCluster(); + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() throws Exception { + Configuration c = TEST_UTIL.getConfiguration(); + FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir"))); + } + + /** + * Pass the key and value to reducer. + */ + public static class ScanMapper extends + TableMapper { + /** + * Pass the key and value to reduce. + * + * @param key The key, here "aaa", "aab" etc. + * @param value The value is the same as the key. + * @param context The task context. + * @throws IOException When reading the rows fails. + */ + @Override + public void map(ImmutableBytesWritable key, Result value, Context context) + throws IOException, InterruptedException { + if (value.size() != 1) { + throw new IOException("There should only be one input column"); + } + Map>> cf = + value.getMap(); + if (!cf.containsKey(INPUT_FAMILY)) { + throw new IOException("Wrong input columns. Missing: '" + + Bytes.toString(INPUT_FAMILY) + "'."); + } + String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null)); + LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) + + ", value -> " + val); + context.write(key, key); + } + } + + /** + * Checks the last and first keys seen against the scanner boundaries. + */ + public static class ScanReducer + extends + Reducer { + private String first = null; + private String last = null; + + protected void reduce(ImmutableBytesWritable key, + Iterable values, Context context) + throws IOException, InterruptedException { + int count = 0; + for (ImmutableBytesWritable value : values) { + String val = Bytes.toStringBinary(value.get()); + LOG.debug("reduce: key[" + count + "] -> " + + Bytes.toStringBinary(key.get()) + ", value -> " + val); + if (first == null) first = val; + last = val; + count++; + } + assertEquals(4, count); + } + + protected void cleanup(Context context) throws IOException, + InterruptedException { + Configuration c = context.getConfiguration(); + String startRow = c.get(KEY_STARTROW); + String lastRow = c.get(KEY_LASTROW); + LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + + startRow + "\""); + LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + + "\""); + if (startRow != null && startRow.length() > 0) { + assertEquals(startRow, first); + } + if (lastRow != null && lastRow.length() > 0) { + assertEquals(lastRow, last); + } + } + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToEmpty() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, null, null); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToAPP() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, "app", "apo"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanOBBToOPP() throws IOException, InterruptedException, + ClassNotFoundException { + testScan("obb", "opp", "opo"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanOPPToEmpty() throws IOException, InterruptedException, + ClassNotFoundException { + testScan("opp", null, "zzz"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanYZYToEmpty() throws IOException, InterruptedException, + ClassNotFoundException { + testScan("yzy", null, "zzz"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + private void testScan(String start, String stop, String last) + throws IOException, InterruptedException, ClassNotFoundException { + String jobName = + "Scan" + (start != null ? start.toUpperCase() : "Empty") + "To" + + (stop != null ? stop.toUpperCase() : "Empty"); + LOG.info("Before map/reduce startup - job " + jobName); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + Scan scan = new Scan(); + scan.addFamily(INPUT_FAMILY); + if (start != null) { + scan.setStartRow(Bytes.toBytes(start)); + } + c.set(KEY_STARTROW, start != null ? start : ""); + if (stop != null) { + scan.setStopRow(Bytes.toBytes(stop)); + } + c.set(KEY_LASTROW, last != null ? last : ""); + LOG.info("scan before: " + scan); + Job job = new Job(c, jobName); + + MultiTableInputCollection mtic = new MultiTableInputCollection(); + mtic.Add(TABLE_NAME + "1", scan); + mtic.Add(TABLE_NAME + "1", scan); + mtic.Add(TABLE_NAME + "2", scan); + mtic.Add(TABLE_NAME + "3", scan); + + TableMapReduceUtil.initTableMapperJob(mtic, ScanMapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); + job.setReducerClass(ScanReducer.class); + job.setNumReduceTasks(1); // one to get final "first" and "last" key + FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); + LOG.info("Started " + job.getJobName()); + job.waitForCompletion(true); + assertTrue(job.isSuccessful()); + LOG.info("After map/reduce completion - job " + jobName); + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java (revision 1302724) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java (working copy) @@ -27,13 +27,15 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; +import org.mortbay.log.Log; /** - * A table split corresponds to a key range (low, high). All references to row - * below refer to the key of the row. + * A table split corresponds to a key range (low, high) and an optional scanner. + * All references to row below refer to the key of the row. */ @InterfaceAudience.Public @InterfaceStability.Stable @@ -44,10 +46,11 @@ private byte [] startRow; private byte [] endRow; private String regionLocation; + private String scan = ""; /** Default constructor. */ public TableSplit() { - this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, + this(HConstants.EMPTY_BYTE_ARRAY, null, HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, ""); } @@ -55,25 +58,55 @@ * Creates a new instance while assigning all variables. * * @param tableName The name of the current table. + * @param scan A scan associated with this split. * @param startRow The start row of the split. * @param endRow The end row of the split. * @param location The location of the region. */ - public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow, - final String location) { + public TableSplit(byte [] tableName, Scan scan, byte [] startRow, + byte [] endRow, final String location) { this.tableName = tableName; + try { + this.scan = + (null == scan) ? "" : TableMapReduceUtil.convertScanToString(scan); + } catch (IOException e) { + Log.warn("Failed to convert Scan to String", e); + } this.startRow = startRow; this.endRow = endRow; this.regionLocation = location; } /** + * Creates a new instance without a scanner. + * + * @param tableName The name of the current table. + * @param startRow The start row of the split. + * @param endRow The end row of the split. + * @param location The location of the region. + */ + public TableSplit(byte[] tableName, byte[] startRow, byte[] endRow, + final String location) { + this(tableName, null, startRow, endRow, location); + } + + /** + * Returns a Scan object from the stored string representation. + * + * @return Returns a Scan object based on the stored scanner. + * @throws IOException + */ + public Scan getScan() throws IOException { + return TableMapReduceUtil.convertStringToScan(this.scan); + } + + /** * Returns the table name. * * @return The table name. */ public byte [] getTableName() { - return tableName; + return this.tableName; } /** @@ -111,7 +144,7 @@ */ @Override public String[] getLocations() { - return new String[] {regionLocation}; + return new String[] { regionLocation }; } /** @@ -129,12 +162,13 @@ /** * Reads the values of each field. * - * @param in The input to read from. + * @param in The input to read from. * @throws IOException When reading the input fails. */ @Override public void readFields(DataInput in) throws IOException { tableName = Bytes.readByteArray(in); + scan = Bytes.toString(Bytes.readByteArray(in)); startRow = Bytes.readByteArray(in); endRow = Bytes.readByteArray(in); regionLocation = Bytes.toString(Bytes.readByteArray(in)); @@ -143,12 +177,13 @@ /** * Writes the field values to the output. * - * @param out The output to write to. + * @param out The output to write to. * @throws IOException When writing the values to the output fails. */ @Override public void write(DataOutput out) throws IOException { Bytes.writeByteArray(out, tableName); + Bytes.writeByteArray(out, Bytes.toBytes(scan)); Bytes.writeByteArray(out, startRow); Bytes.writeByteArray(out, endRow); Bytes.writeByteArray(out, Bytes.toBytes(regionLocation)); @@ -162,20 +197,25 @@ */ @Override public String toString() { - return regionLocation + ":" + - Bytes.toStringBinary(startRow) + "," + Bytes.toStringBinary(endRow); + return regionLocation + ":" + Bytes.toStringBinary(startRow) + "," + + Bytes.toStringBinary(endRow); } /** * Compares this split against the given one. * - * @param split The split to compare to. + * @param split The split to compare to. * @return The result of the comparison. * @see java.lang.Comparable#compareTo(java.lang.Object) */ @Override public int compareTo(TableSplit split) { - return Bytes.compareTo(getStartRow(), split.getStartRow()); + // If The table name of the two splits is the same then compare start row + // otherwise compare based on table names + int tableNameComparison = + Bytes.compareTo(getTableName(), split.getTableName()); + return tableNameComparison != 0 ? tableNameComparison : Bytes.compareTo( + getStartRow(), split.getStartRow()); } @Override @@ -183,18 +223,21 @@ if (o == null || !(o instanceof TableSplit)) { return false; } - return Bytes.equals(tableName, ((TableSplit)o).tableName) && - Bytes.equals(startRow, ((TableSplit)o).startRow) && - Bytes.equals(endRow, ((TableSplit)o).endRow) && - regionLocation.equals(((TableSplit)o).regionLocation); + return Bytes.equals(tableName, ((TableSplit) o).tableName) && + scan.equals(((TableSplit) o).scan) && + Bytes.equals(startRow, ((TableSplit) o).startRow) && + Bytes.equals(endRow, ((TableSplit) o).endRow) && + regionLocation.equals(((TableSplit) o).regionLocation); } - @Override - public int hashCode() { - int result = tableName != null ? Arrays.hashCode(tableName) : 0; - result = 31 * result + (startRow != null ? Arrays.hashCode(startRow) : 0); - result = 31 * result + (endRow != null ? Arrays.hashCode(endRow) : 0); - result = 31 * result + (regionLocation != null ? regionLocation.hashCode() : 0); - return result; - } + @Override + public int hashCode() { + int result = tableName != null ? Arrays.hashCode(tableName) : 0; + result = 31 * result + (scan != null ? scan.hashCode() : 0); + result = 31 * result + (startRow != null ? Arrays.hashCode(startRow) : 0); + result = 31 * result + (endRow != null ? Arrays.hashCode(endRow) : 0); + result = + 31 * result + (regionLocation != null ? regionLocation.hashCode() : 0); + return result; + } } Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java (revision 1302724) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java (working copy) @@ -47,7 +47,6 @@ @InterfaceStability.Stable public class TableRecordReaderImpl { - static final Log LOG = LogFactory.getLog(TableRecordReader.class); // HBASE_COUNTER_GROUP_NAME is the name of mapreduce counter group for HBase @@ -66,7 +65,7 @@ /** * Restart from survivable exceptions by creating a new scanner. * - * @param firstRow The first row to start at. + * @param firstRow The first row to start at. * @throws IOException When restarting fails. */ public void restart(byte[] firstRow) throws IOException { @@ -100,7 +99,7 @@ /** * Sets the HBase table. * - * @param htable The {@link HTable} to scan. + * @param htable The {@link HTable} to scan. */ public void setHTable(HTable htable) { this.htable = htable; @@ -109,7 +108,7 @@ /** * Sets the scan defining the actual details like columns etc. * - * @param scan The scan to set. + * @param scan The scan to set. */ public void setScan(Scan scan) { this.scan = scan; @@ -132,11 +131,14 @@ /** * Closes the split. - * - * */ public void close() { this.scanner.close(); + try { + this.htable.close(); + } catch (IOException e) { + LOG.error(e); + } } /** Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (revision 1302724) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (working copy) @@ -28,6 +28,7 @@ import java.lang.reflect.InvocationTargetException; import java.net.URL; import java.net.URLDecoder; +import java.util.ArrayList; import java.util.Enumeration; import java.util.HashSet; import java.util.Set; @@ -39,7 +40,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Scan; @@ -105,8 +105,8 @@ Class outputKeyClass, Class outputValueClass, Job job) throws IOException { - initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, - job, true); + initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, + outputValueClass, job, true); } /** @@ -220,6 +220,68 @@ outputValueClass, job, addDependencyJars, TableInputFormat.class); } + /** + * Use this before submitting a Multi TableMap job. It will appropriately set + * up the job. + * + * @param tables The tables collection to read from. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is carrying + * all necessary HBase configuration. + * @throws IOException When setting up the details fails. + */ + public static void initTableMapperJob(MultiTableInputCollection tables, + Class mapper, + Class outputKeyClass, + Class outputValueClass, Job job) throws IOException { + initTableMapperJob(tables, mapper, outputKeyClass, outputValueClass, job, + true); + } + + /** + * Use this before submitting a Multi TableMap job. It will appropriately set + * up the job. + * + * @param tables The tables collection to read from. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is carrying + * all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the + * configured job classes via the distributed cache (tmpjars). + * @throws IOException When setting up the details fails. + */ + public static void initTableMapperJob(MultiTableInputCollection tables, + Class mapper, + Class outputKeyClass, + Class outputValueClass, Job job, + boolean addDependencyJars) throws IOException { + job.setInputFormatClass(MultiTableInputFormat.class); + if (outputValueClass != null) + job.setMapOutputValueClass(outputValueClass); + if (outputKeyClass != null) + job.setMapOutputKeyClass(outputKeyClass); + job.setMapperClass(mapper); + HBaseConfiguration.addHbaseResources(job.getConfiguration()); + ArrayList inputTables = new ArrayList(); + + for (MultiTableInputCollection.TableInputConf tic : tables) { + inputTables.add(tic.getTableName()); + String scan = convertScanToString(tic.getScan()); + inputTables.add(scan); + } + job.getConfiguration().setStrings(MultiTableInputFormat.INPUT_TABLES, + inputTables.toArray(new String[inputTables.size()])); + + if (addDependencyJars) { + addDependencyJars(job); + } + initCredentials(job); + } + public static void initCredentials(Job job) throws IOException { if (User.isHBaseSecurityEnabled(job.getConfiguration())) { try { @@ -248,7 +310,7 @@ /** * Converts the given Base64 string back into a Scan instance. * - * @param base64 The scan details. + * @param base64 The scan details. * @return The newly created Scan instance. * @throws IOException When reading the scan instance fails. */ @@ -392,12 +454,12 @@ * Ensures that the given number of reduce tasks for the given job * configuration does not exceed the number of regions for the given table. * - * @param table The table to get the region count for. - * @param job The current job to adjust. + * @param table The table to get the region count for. + * @param job The current job to adjust. * @throws IOException When retrieving the table details fails. */ public static void limitNumReduceTasks(String table, Job job) - throws IOException { + throws IOException { HTable outputTable = new HTable(job.getConfiguration(), table); int regions = outputTable.getRegionsInfo().size(); if (job.getNumReduceTasks() > regions) @@ -408,12 +470,12 @@ * Sets the number of reduce tasks for the given job configuration to the * number of regions the given table has. * - * @param table The table to get the region count for. - * @param job The current job to adjust. + * @param table The table to get the region count for. + * @param job The current job to adjust. * @throws IOException When retrieving the table details fails. */ public static void setNumReduceTasks(String table, Job job) - throws IOException { + throws IOException { HTable outputTable = new HTable(job.getConfiguration(), table); int regions = outputTable.getRegionsInfo().size(); job.setNumReduceTasks(regions); @@ -548,8 +610,8 @@ ClassLoader loader = my_class.getClassLoader(); String class_file = my_class.getName().replaceAll("\\.", "/") + ".class"; try { - for(Enumeration itr = loader.getResources(class_file); - itr.hasMoreElements();) { + for (Enumeration itr = loader.getResources(class_file); + itr.hasMoreElements();) { URL url = (URL) itr.nextElement(); if ("jar".equals(url.getProtocol())) { String toReturn = url.getPath(); @@ -573,5 +635,4 @@ return null; } - } Index: src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java (revision 0) @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +@InterfaceAudience.Public +public abstract class MultiTableInputFormatBase extends + InputFormat { + + final Log LOG = LogFactory.getLog(MultiTableInputFormatBase.class); + + /** Holds the input table+scanner collection. */ + private MultiTableInputCollection tables; + + /** The reader scanning the table, can be a custom one. */ + private TableRecordReader tableRecordReader = null; + + /** + * Builds a TableRecordReader. If no TableRecordReader was provided, uses the + * default. + * + * @param split The split to work with. + * @param context The current context. + * @return The newly created record reader. + * @throws IOException When creating the reader fails. + * @throws InterruptedException when record reader initialization fails + * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader( + * org.apache.hadoop.mapreduce.InputSplit, + * org.apache.hadoop.mapreduce.TaskAttemptContext) + */ + @Override + public RecordReader createRecordReader( + InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + TableSplit tSplit = (TableSplit) split; + + if (tSplit.getTableName() == null) { + throw new IOException("Cannot create a record reader because of a" + + " previous error. Please look at the previous logs lines from" + + " the task's full log for more details."); + } + HTable table = + new HTable(context.getConfiguration(), tSplit.getTableName()); + + TableRecordReader trr = this.tableRecordReader; + // if no table record reader was provided use default + if (trr == null) { + trr = new TableRecordReader(); + } + Scan sc = tSplit.getScan(); + sc.setStartRow(tSplit.getStartRow()); + sc.setStopRow(tSplit.getEndRow()); + trr.setScan(sc); + trr.setHTable(table); + trr.initialize(split, context); + return trr; + } + + /** + * Calculates the splits that will serve as input for the map tasks. The + * number of splits matches the number of regions in a table. + * + * @param context The current job context. + * @return The list of input splits. + * @throws IOException When creating the list of splits fails. + * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext) + */ + @Override + public List getSplits(JobContext context) throws IOException { + if (tables.isEmpty()) { + throw new IOException("No tables were provided."); + } + List splits = new ArrayList(); + + for (MultiTableInputCollection.TableInputConf tic : this.tables) { + HTable table = new HTable(context.getConfiguration(), tic.getTableName()); + Pair keys = table.getStartEndKeys(); + if (keys == null || keys.getFirst() == null || + keys.getFirst().length == 0) { + throw new IOException("Expecting at least one region."); + } + int count = 0; + + for (int i = 0; i < keys.getFirst().length; i++) { + if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { + continue; + } + String regionLocation = + table.getRegionLocation(keys.getFirst()[i], false).getHostname(); + byte[] startRow = tic.getScan().getStartRow(); + byte[] stopRow = tic.getScan().getStopRow(); + // determine if the given start an stop key fall into the region + if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || + Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && + (stopRow.length == 0 || + Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { + byte[] splitStart = + startRow.length == 0 || + Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? keys + .getFirst()[i] : startRow; + byte[] splitStop = + (stopRow.length == 0 || Bytes.compareTo(keys.getSecond()[i], + stopRow) <= 0) && keys.getSecond()[i].length > 0 ? keys + .getSecond()[i] : stopRow; + InputSplit split = + new TableSplit(table.getTableName(), tic.getScan(), splitStart, + splitStop, regionLocation); + splits.add(split); + if (LOG.isDebugEnabled()) + LOG.debug("getSplits: split -> " + (count++) + " -> " + split); + } + } + table.close(); + } + return splits; + } + + /** + * Test if the given region is to be included in the InputSplit while + * splitting the regions of a table. + *

+ * This optimization is effective when there is a specific reasoning to + * exclude an entire region from the M-R job, (and hence, not contributing to + * the InputSplit), given the start and end keys of the same.
+ * Useful when we need to remember the last-processed top record and revisit + * the [last, current) interval for M-R processing, continuously. In addition + * to reducing InputSplits, reduces the load on the region server as well, due + * to the ordering of the keys.
+ *
+ * Note: It is possible that endKey.length() == 0 , for the last + * (recent) region.
+ * Override this method, if you want to bulk exclude regions altogether from + * M-R. By default, no region is excluded( i.e. all regions are included). + * + * @param startKey Start key of the region + * @param endKey End key of the region + * @return true, if this region needs to be included as part of the input + * (default). + */ + protected boolean includeRegionInSplit(final byte[] startKey, + final byte[] endKey) { + return true; + } + + /** + * Allows subclasses to get the {@link MultiTableInputCollection}. + */ + protected MultiTableInputCollection getInputTables() { + return this.tables; + } + + /** + * Allows subclasses to set the collection of input tables and scanners to use + * + * @param tables The {@link MultiTableInputCollection} containing the input + * tables + */ + protected void setInputTables(MultiTableInputCollection tables) { + this.tables = tables; + } + + /** + * Allows subclasses to set the {@link TableRecordReader}. + * + * @param tableRecordReader A different {@link TableRecordReader} + * implementation. + */ + protected void setTableRecordReader(TableRecordReader tableRecordReader) { + this.tableRecordReader = tableRecordReader; + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java (revision 0) @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; + +/** + * Convert HBase tabular data from multiple input tables and scanners into a + * format that is consumable by Map/Reduce. + * + *

+ * Usage example + *

+ * + *
+ * Scan scan1 = new Scan();
+ * scan1.setStartRow(firstRow1);
+ * scan1.setStopRow(lastRow1);
+ *
+ * Scan scan2 = new Scan();
+ * scan2.setStartRow(firstRow2);
+ * scan2.setStopRow(lastRow2);
+ *
+ * MultiTableInputCollection mtic = new MultiTableInputCollection();
+ * mtic.Add(tableName1, scan1);
+ * mtic.Add(tableName2, scan2);
+ *
+ * TableMapReduceUtil.initTableMapperJob(mtic, TableMapper.class, Text.class,
+ *     IntWritable.class, job);
+ * 
+ */ +@InterfaceAudience.Public +public class MultiTableInputFormat extends MultiTableInputFormatBase implements + Configurable { + + /** Job parameter that specifies the input tables list. */ + public static final String INPUT_TABLES = "hbase.mapreduce.inputtables"; + + /** The configuration. */ + private Configuration conf = null; + + /** + * Returns the current configuration. + * + * @return The current configuration. + * @see org.apache.hadoop.conf.Configurable#getConf() + */ + @Override + public Configuration getConf() { + return conf; + } + + /** + * Sets the configuration. This is used to set the details for the tables to + * be scanned. + * + * @param configuration The configuration to set. + * @see org.apache.hadoop.conf.Configurable#setConf( + * org.apache.hadoop.conf.Configuration) + */ + @Override + public void setConf(Configuration configuration) { + this.conf = configuration; + String[] inputTables = conf.getStrings(INPUT_TABLES); + if (inputTables.length % 2 != 0) { + throw new IllegalArgumentException("table and scan don't match:" + + inputTables.length); + } + MultiTableInputCollection tc = new MultiTableInputCollection(); + + for (int i = 0; i < inputTables.length; i = i + 2) { + try { + tc.Add(inputTables[i], + TableMapReduceUtil.convertStringToScan(inputTables[i + 1])); + } catch (IOException e) { + e.printStackTrace(); + } + } + this.setInputTables(tc); + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputCollection.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputCollection.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputCollection.java (revision 0) @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.util.ArrayList; +import java.util.Iterator; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Scan; + +/** + * A collection of input tables and scanners to be used as the source data for a + * mapper + */ +@InterfaceAudience.Public +public class MultiTableInputCollection implements + Iterable { + /** + * An internal structure to hold table names + associated scanners + */ + public class TableInputConf { + private String tableName = null; + private Scan scan = null; + + /** + * The empty constructor + */ + public TableInputConf() { + } + + /** + * @param tableName Input table name + * @param scan A Scanner associated with this table + */ + public TableInputConf(String tableName, Scan scan) { + this.tableName = tableName; + this.scan = scan; + } + + /** + * @return The table name + */ + public String getTableName() { + return this.tableName; + } + + /** + * Sets the table name + * + * @param tableName The table name to set + */ + public void setTableName(String tableName) { + this.tableName = tableName; + } + + /** + * Returns the scan + * + * @return The scan + */ + public Scan getScan() { + return this.scan; + } + + /** + * Sets the scan + * + * @param scan The scan to set + */ + public void setScan(Scan scan) { + this.scan = scan; + } + } + + private ArrayList tables = new ArrayList(); + + /** + * Add new table and scan pair to the collection + * + * @param tableName An input table name + * @param scan An associated scan + */ + public void Add(String tableName, Scan scan) { + TableInputConf tic = new TableInputConf(tableName, scan); + tables.add(tic); + } + + @Override + public Iterator iterator() { + return tables.iterator(); + } + + /** + * Checks if the tables collection is empty + * + * @return true if empty false otherwise + */ + public boolean isEmpty() { + return tables.isEmpty(); + } +}