Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java (revision 0) @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Scan; + +/** + * Convert HBase tabular data from multiple scanners into a format that + * is consumable by Map/Reduce. + * + *

+ * Usage example + *

+ * + *
+ * List scans = new ArrayList();
+ * 
+ * Scan scan1 = new Scan();
+ * scan1.setStartRow(firstRow1);
+ * scan1.setStopRow(lastRow1);
+ * scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table1);
+ * scans.add(scan1);
+ *
+ * Scan scan2 = new Scan();
+ * scan2.setStartRow(firstRow2);
+ * scan2.setStopRow(lastRow2);
+ * scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table2);
+ * scans.add(scan2);
+ *
+ * TableMapReduceUtil.initTableMapperJob(scans, TableMapper.class, Text.class,
+ *     IntWritable.class, job);
+ * 
+ */ +@InterfaceAudience.Public +public class MultiTableInputFormat extends MultiTableInputFormatBase implements + Configurable { + + /** Job parameter that specifies the scan list. */ + public static final String SCANS = "hbase.mapreduce.scans"; + + /** The configuration. */ + private Configuration conf = null; + + /** + * Returns the current configuration. + * + * @return The current configuration. + * @see org.apache.hadoop.conf.Configurable#getConf() + */ + @Override + public Configuration getConf() { + return conf; + } + + /** + * Sets the configuration. This is used to set the details for the tables to + * be scanned. + * + * @param configuration The configuration to set. + * @see org.apache.hadoop.conf.Configurable#setConf( + * org.apache.hadoop.conf.Configuration) + */ + @Override + public void setConf(Configuration configuration) { + this.conf = configuration; + String[] rawScans = conf.getStrings(SCANS); + if (rawScans.length <= 0) { + throw new IllegalArgumentException("There must be at least 1 scan configuration set to : " + SCANS); + } + List scans = new ArrayList(); + + for (int i = 0; i < rawScans.length; i++) { + try { + scans.add(TableMapReduceUtil.convertStringToScan(rawScans[i])); + } catch (IOException e) { + throw new RuntimeException("Failed to convert Scan to string", e); + } + } + this.setScans(scans); + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java (revision 0) @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * A base for {@link MultiTableInputFormat}s. Receives a list of + * {@link Scan} instances that define the input tables and + * filters etc. Subclasses may use other TableRecordReader implementations. + */ +@InterfaceAudience.Public +public abstract class MultiTableInputFormatBase extends + InputFormat { + + final Log LOG = LogFactory.getLog(MultiTableInputFormatBase.class); + + /** Holds the set of scans used to define the input. */ + private List scans; + + /** The reader scanning the table, can be a custom one. */ + private TableRecordReader tableRecordReader = null; + + /** + * Builds a TableRecordReader. If no TableRecordReader was provided, uses the + * default. + * + * @param split The split to work with. + * @param context The current context. + * @return The newly created record reader. + * @throws IOException When creating the reader fails. + * @throws InterruptedException when record reader initialization fails + * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader( + * org.apache.hadoop.mapreduce.InputSplit, + * org.apache.hadoop.mapreduce.TaskAttemptContext) + */ + @Override + public RecordReader createRecordReader( + InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + TableSplit tSplit = (TableSplit) split; + + if (tSplit.getTableName() == null) { + throw new IOException("Cannot create a record reader because of a" + + " previous error. Please look at the previous logs lines from" + + " the task's full log for more details."); + } + HTable table = + new HTable(context.getConfiguration(), tSplit.getTableName()); + + TableRecordReader trr = this.tableRecordReader; + // if no table record reader was provided use default + if (trr == null) { + trr = new TableRecordReader(); + } + Scan sc = tSplit.getScan(); + sc.setStartRow(tSplit.getStartRow()); + sc.setStopRow(tSplit.getEndRow()); + trr.setScan(sc); + trr.setHTable(table); + trr.initialize(split, context); + return trr; + } + + /** + * Calculates the splits that will serve as input for the map tasks. The + * number of splits matches the number of regions in a table. + * + * @param context The current job context. + * @return The list of input splits. + * @throws IOException When creating the list of splits fails. + * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext) + */ + @Override + public List getSplits(JobContext context) throws IOException { + if (scans.isEmpty()) { + throw new IOException("No scans were provided."); + } + List splits = new ArrayList(); + + for (Scan scan : scans) { + byte[] tableName = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME); + if (tableName == null) + throw new IOException("A scan object did not have a table name"); + HTable table = new HTable(context.getConfiguration(), tableName); + Pair keys = table.getStartEndKeys(); + if (keys == null || keys.getFirst() == null || + keys.getFirst().length == 0) { + throw new IOException("Expecting at least one region."); + } + int count = 0; + + for (int i = 0; i < keys.getFirst().length; i++) { + if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { + continue; + } + String regionLocation = + table.getRegionLocation(keys.getFirst()[i], false).getHostname(); + byte[] startRow = scan.getStartRow(); + byte[] stopRow = scan.getStopRow(); + // determine if the given start an stop key fall into the region + if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || + Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && + (stopRow.length == 0 || + Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { + byte[] splitStart = + startRow.length == 0 || + Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? keys + .getFirst()[i] : startRow; + byte[] splitStop = + (stopRow.length == 0 || Bytes.compareTo(keys.getSecond()[i], + stopRow) <= 0) && keys.getSecond()[i].length > 0 ? keys + .getSecond()[i] : stopRow; + InputSplit split = + new TableSplit(tableName, scan, splitStart, + splitStop, regionLocation); + splits.add(split); + if (LOG.isDebugEnabled()) + LOG.debug("getSplits: split -> " + (count++) + " -> " + split); + } + } + table.close(); + } + return splits; + } + + /** + * Test if the given region is to be included in the InputSplit while + * splitting the regions of a table. + *

+ * This optimization is effective when there is a specific reasoning to + * exclude an entire region from the M-R job, (and hence, not contributing to + * the InputSplit), given the start and end keys of the same.
+ * Useful when we need to remember the last-processed top record and revisit + * the [last, current) interval for M-R processing, continuously. In addition + * to reducing InputSplits, reduces the load on the region server as well, due + * to the ordering of the keys.
+ *
+ * Note: It is possible that endKey.length() == 0 , for the last + * (recent) region.
+ * Override this method, if you want to bulk exclude regions altogether from + * M-R. By default, no region is excluded( i.e. all regions are included). + * + * @param startKey Start key of the region + * @param endKey End key of the region + * @return true, if this region needs to be included as part of the input + * (default). + */ + protected boolean includeRegionInSplit(final byte[] startKey, + final byte[] endKey) { + return true; + } + + /** + * Allows subclasses to get the list of {@link Scan} objects. + */ + protected List getScans() { + return this.scans; + } + + /** + * Allows subclasses to set the list of {@link Scan} objects. + * + * @param scans The list of {@link Scan} used to define the input + */ + protected void setScans(List scans) { + this.scans = scans; + } + + /** + * Allows subclasses to set the {@link TableRecordReader}. + * + * @param tableRecordReader A different {@link TableRecordReader} + * implementation. + */ + protected void setTableRecordReader(TableRecordReader tableRecordReader) { + this.tableRecordReader = tableRecordReader; + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (revision 1429079) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (working copy) @@ -23,8 +23,10 @@ import java.lang.reflect.Method; import java.net.URL; import java.net.URLDecoder; +import java.util.ArrayList; import java.util.Enumeration; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.apache.commons.logging.Log; @@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.StringUtils; @@ -218,7 +221,66 @@ initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job, addDependencyJars, TableInputFormat.class); } + + /** + * Use this before submitting a Multi TableMap job. It will appropriately set + * up the job. + * + * @param scans The list of {@link Scan} objects to read from. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is carrying + * all necessary HBase configuration. + * @throws IOException When setting up the details fails. + */ + public static void initTableMapperJob(List scans, + Class mapper, + Class outputKeyClass, + Class outputValueClass, Job job) throws IOException { + initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job, + true); + } + /** + * Use this before submitting a Multi TableMap job. It will appropriately set + * up the job. + * + * @param scans The list of {@link Scan} objects to read from. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is carrying + * all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the + * configured job classes via the distributed cache (tmpjars). + * @throws IOException When setting up the details fails. + */ + public static void initTableMapperJob(List scans, + Class mapper, + Class outputKeyClass, + Class outputValueClass, Job job, + boolean addDependencyJars) throws IOException { + job.setInputFormatClass(MultiTableInputFormat.class); + if (outputValueClass != null) + job.setMapOutputValueClass(outputValueClass); + if (outputKeyClass != null) + job.setMapOutputKeyClass(outputKeyClass); + job.setMapperClass(mapper); + HBaseConfiguration.addHbaseResources(job.getConfiguration()); + List scanStrings = new ArrayList(); + + for (Scan scan : scans) { + scanStrings.add(convertScanToString(scan)); + } + job.getConfiguration().setStrings(MultiTableInputFormat.SCANS, + scanStrings.toArray(new String[scanStrings.size()])); + + if (addDependencyJars) { + addDependencyJars(job); + } + } + public static void initCredentials(Job job) throws IOException { if (User.isHBaseSecurityEnabled(job.getConfiguration())) { try { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java (revision 1429079) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java (working copy) @@ -26,27 +26,63 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.InputSplit; +import org.mortbay.log.Log; /** - * A table split corresponds to a key range (low, high). All references to row - * below refer to the key of the row. + * A table split corresponds to a key range (low, high) and an optional scanner. + * All references to row below refer to the key of the row. */ @InterfaceAudience.Public -@InterfaceStability.Stable +@InterfaceStability.Evolving public class TableSplit extends InputSplit implements Writable, Comparable { + + // should be < 0 (@see #readFields(DataInput)) + // version 1 supports Scan data member + enum Version { + UNVERSIONED(0), + // Initial number we put on TableSplit when we introduced versioning. + INITIAL(-1); + final int code; + static final Version[] byCode; + static { + byCode = Version.values(); + for (int i = 0; i < byCode.length; i++) { + if (byCode[i].code != -1 * i) { + throw new AssertionError("Values in this enum should be descending by one"); + } + } + } + + Version(int code) { + this.code = code; + } + + boolean atLeast(Version other) { + return code <= other.code; + } + + static Version fromCode(int code) { + return byCode[code * -1]; + } + } + + private static final Version VERSION = Version.INITIAL; private byte [] tableName; private byte [] startRow; private byte [] endRow; private String regionLocation; + private String scan = ""; // stores the serialized form of the Scan /** Default constructor. */ public TableSplit() { - this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, + this(HConstants.EMPTY_BYTE_ARRAY, null, HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, ""); } @@ -54,19 +90,49 @@ * Creates a new instance while assigning all variables. * * @param tableName The name of the current table. + * @param scan A scan associated with this split. * @param startRow The start row of the split. * @param endRow The end row of the split. * @param location The location of the region. */ - public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow, + public TableSplit(byte [] tableName, Scan scan, byte [] startRow, byte [] endRow, final String location) { this.tableName = tableName; + try { + this.scan = + (null == scan) ? "" : TableMapReduceUtil.convertScanToString(scan); + } catch (IOException e) { + Log.warn("Failed to convert Scan to String", e); + } this.startRow = startRow; this.endRow = endRow; this.regionLocation = location; } + + /** + * Creates a new instance without a scanner. + * + * @param tableName The name of the current table. + * @param startRow The start row of the split. + * @param endRow The end row of the split. + * @param location The location of the region. + */ + public TableSplit(byte[] tableName, byte[] startRow, byte[] endRow, + final String location) { + this(tableName, null, startRow, endRow, location); + } /** + * Returns a Scan object from the stored string representation. + * + * @return Returns a Scan object based on the stored scanner. + * @throws IOException + */ + public Scan getScan() throws IOException { + return TableMapReduceUtil.convertStringToScan(this.scan); + } + + /** * Returns the table name. * * @return The table name. @@ -133,10 +199,29 @@ */ @Override public void readFields(DataInput in) throws IOException { - tableName = Bytes.readByteArray(in); + Version version = Version.UNVERSIONED; + // TableSplit was not versioned in the beginning. + // In order to introduce it now, we make use of the fact + // that tableName was written with Bytes.writeByteArray, + // which encodes the array length as a vint which is >= 0. + // Hence if the vint is >= 0 we have an old version and the vint + // encodes the length of tableName. + // If < 0 we just read the version and the next vint is the length. + // @see Bytes#readByteArray(DataInput) + int len = WritableUtils.readVInt(in); + if (len < 0) { + // what we just read was the version + version = Version.fromCode(len); + len = WritableUtils.readVInt(in); + } + tableName = new byte[len]; + in.readFully(tableName); startRow = Bytes.readByteArray(in); endRow = Bytes.readByteArray(in); regionLocation = Bytes.toString(Bytes.readByteArray(in)); + if (version.atLeast(Version.INITIAL)) { + scan = Bytes.toString(Bytes.readByteArray(in)); + } } /** @@ -147,10 +232,12 @@ */ @Override public void write(DataOutput out) throws IOException { + WritableUtils.writeVInt(out, VERSION.code); Bytes.writeByteArray(out, tableName); Bytes.writeByteArray(out, startRow); Bytes.writeByteArray(out, endRow); Bytes.writeByteArray(out, Bytes.toBytes(regionLocation)); + Bytes.writeByteArray(out, Bytes.toBytes(scan)); } /** @@ -174,7 +261,12 @@ */ @Override public int compareTo(TableSplit split) { - return Bytes.compareTo(getStartRow(), split.getStartRow()); + // If The table name of the two splits is the same then compare start row + // otherwise compare based on table names + int tableNameComparison = + Bytes.compareTo(getTableName(), split.getTableName()); + return tableNameComparison != 0 ? tableNameComparison : Bytes.compareTo( + getStartRow(), split.getStartRow()); } @Override @@ -191,6 +283,7 @@ @Override public int hashCode() { int result = tableName != null ? Arrays.hashCode(tableName) : 0; + result = 31 * result + (scan != null ? scan.hashCode() : 0); result = 31 * result + (startRow != null ? Arrays.hashCode(startRow) : 0); result = 31 * result + (endRow != null ? Arrays.hashCode(endRow) : 0); result = 31 * result + (regionLocation != null ? regionLocation.hashCode() : 0); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java (revision 0) @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Tests various scan start and stop row scenarios. This is set in a scan and + * tested in a MapReduce job to see if that is handed over and done properly + * too. + */ +@Category(LargeTests.class) +public class TestMultiTableInputFormat { + + static final Log LOG = LogFactory.getLog(TestMultiTableInputFormat.class); + static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + static final String TABLE_NAME = "scantest"; + static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); + static final String KEY_STARTROW = "startRow"; + static final String KEY_LASTROW = "stpRow"; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // switch TIF to log at DEBUG level + TEST_UTIL.enableDebug(MultiTableInputFormat.class); + TEST_UTIL.enableDebug(MultiTableInputFormatBase.class); + // start mini hbase cluster + TEST_UTIL.startMiniCluster(3); + // create and fill table + for (int i = 0; i < 3; i++) { + HTable table = + TEST_UTIL.createTable(Bytes.toBytes(TABLE_NAME + String.valueOf(i)), + INPUT_FAMILY); + TEST_UTIL.createMultiRegions(table, INPUT_FAMILY); + TEST_UTIL.loadTable(table, INPUT_FAMILY); + } + // start MR cluster + TEST_UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniMapReduceCluster(); + TEST_UTIL.shutdownMiniCluster(); + } + + @After + public void tearDown() throws Exception { + Configuration c = TEST_UTIL.getConfiguration(); + FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir"))); + } + + /** + * Pass the key and value to reducer. + */ + public static class ScanMapper extends + TableMapper { + /** + * Pass the key and value to reduce. + * + * @param key The key, here "aaa", "aab" etc. + * @param value The value is the same as the key. + * @param context The task context. + * @throws IOException When reading the rows fails. + */ + @Override + public void map(ImmutableBytesWritable key, Result value, Context context) + throws IOException, InterruptedException { + if (value.size() != 1) { + throw new IOException("There should only be one input column"); + } + Map>> cf = + value.getMap(); + if (!cf.containsKey(INPUT_FAMILY)) { + throw new IOException("Wrong input columns. Missing: '" + + Bytes.toString(INPUT_FAMILY) + "'."); + } + String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null)); + LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) + + ", value -> " + val); + context.write(key, key); + } + } + + /** + * Checks the last and first keys seen against the scanner boundaries. + */ + public static class ScanReducer + extends + Reducer { + private String first = null; + private String last = null; + + protected void reduce(ImmutableBytesWritable key, + Iterable values, Context context) + throws IOException, InterruptedException { + int count = 0; + for (ImmutableBytesWritable value : values) { + String val = Bytes.toStringBinary(value.get()); + LOG.debug("reduce: key[" + count + "] -> " + + Bytes.toStringBinary(key.get()) + ", value -> " + val); + if (first == null) first = val; + last = val; + count++; + } + assertEquals(4, count); + } + + protected void cleanup(Context context) throws IOException, + InterruptedException { + Configuration c = context.getConfiguration(); + String startRow = c.get(KEY_STARTROW); + String lastRow = c.get(KEY_LASTROW); + LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + + startRow + "\""); + LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + + "\""); + if (startRow != null && startRow.length() > 0) { + assertEquals(startRow, first); + } + if (lastRow != null && lastRow.length() > 0) { + assertEquals(lastRow, last); + } + } + } + + @Test + public void testScanEmptyToEmpty() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, null, null); + } + + @Test + public void testScanEmptyToAPP() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, "app", "apo"); + } + + @Test + public void testScanOBBToOPP() throws IOException, InterruptedException, + ClassNotFoundException { + testScan("obb", "opp", "opo"); + } + + @Test + public void testScanOPPToEmpty() throws IOException, InterruptedException, + ClassNotFoundException { + testScan("opp", null, "zzz"); + } + + @Test + public void testScanYZYToEmpty() throws IOException, InterruptedException, + ClassNotFoundException { + testScan("yzy", null, "zzz"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + private void testScan(String start, String stop, String last) + throws IOException, InterruptedException, ClassNotFoundException { + String jobName = + "Scan" + (start != null ? start.toUpperCase() : "Empty") + "To" + + (stop != null ? stop.toUpperCase() : "Empty"); + LOG.info("Before map/reduce startup - job " + jobName); + + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + c.set(KEY_STARTROW, start != null ? start : ""); + c.set(KEY_LASTROW, last != null ? last : ""); + + List scans = new ArrayList(); + + for(int i=0; i<3; i++){ + Scan scan = new Scan(); + + scan.addFamily(INPUT_FAMILY); + scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(TABLE_NAME + i)); + + if (start != null) { + scan.setStartRow(Bytes.toBytes(start)); + } + if (stop != null) { + scan.setStopRow(Bytes.toBytes(stop)); + } + + scans.add(scan); + + LOG.info("scan before: " + scan); + } + + Job job = new Job(c, jobName); + + TableMapReduceUtil.initTableMapperJob(scans, ScanMapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); + job.setReducerClass(ScanReducer.class); + job.setNumReduceTasks(1); // one to get final "first" and "last" key + FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); + LOG.info("Started " + job.getJobName()); + job.waitForCompletion(true); + assertTrue(job.isSuccessful()); + LOG.info("After map/reduce completion - job " + jobName); + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java (revision 1429079) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java (working copy) @@ -95,11 +95,12 @@ // If application wants to collect scan metrics, it needs to // call scan.setAttribute(SCAN_ATTRIBUTES_ENABLE, Bytes.toBytes(Boolean.TRUE)) - static public String SCAN_ATTRIBUTES_METRICS_ENABLE = + public static final String SCAN_ATTRIBUTES_METRICS_ENABLE = "scan.attributes.metrics.enable"; - static public String SCAN_ATTRIBUTES_METRICS_DATA = + public static final String SCAN_ATTRIBUTES_METRICS_DATA = "scan.attributes.metrics.data"; - + public static final String SCAN_ATTRIBUTES_TABLE_NAME = "scan.attributes.table.name"; + /* * -1 means no caching */