Index: src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 833436) +++ src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -21,19 +21,32 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Jdk14Logger; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapred.MiniMRCluster; +import com.sun.corba.se.pept.transport.Connection; /** * Facility for testing HBase. Added as tool to abet junit4 testing. Replaces @@ -45,11 +58,14 @@ * make changes to configuration parameters. */ public class HBaseTestingUtility { + private final Log LOG = LogFactory.getLog(getClass()); + private final HBaseConfiguration conf = new HBaseConfiguration(); private MiniZooKeeperCluster zkCluster = null; private MiniDFSCluster dfsCluster = null; private MiniHBaseCluster hbaseCluster = null; + private MiniMRCluster mrCluster = null; private File clusterTestBuildDir = null; /** System property key to get test directory value. @@ -185,7 +201,7 @@ } /** - * Flusheds all caches in the mini hbase cluster + * Flushes all caches in the mini hbase cluster * @throws IOException */ public void flush() throws IOException { @@ -200,7 +216,7 @@ * @return An HTable instance for the created table. * @throws IOException */ - public HTable createTable(byte [] tableName, byte [] family) + public HTable createTable(byte[] tableName, byte[] family) throws IOException{ return createTable(tableName, new byte[][]{family}); } @@ -212,10 +228,10 @@ * @return An HTable instance for the created table. * @throws IOException */ - public HTable createTable(byte [] tableName, byte [][] families) + public HTable createTable(byte[] tableName, byte[][] families) throws IOException { HTableDescriptor desc = new HTableDescriptor(tableName); - for(byte [] family : families) { + for(byte[] family : families) { desc.addFamily(new HColumnDescriptor(family)); } (new HBaseAdmin(getConfiguration())).createTable(desc); @@ -230,7 +246,7 @@ * @return An HTable instance for the created table. * @throws IOException */ - public HTable createTable(byte [] tableName, byte [] family, int numVersions) + public HTable createTable(byte[] tableName, byte[] family, int numVersions) throws IOException { return createTable(tableName, new byte[][]{family}, numVersions); } @@ -243,11 +259,11 @@ * @return An HTable instance for the created table. * @throws IOException */ - public HTable createTable(byte [] tableName, byte [][] families, + public HTable createTable(byte[] tableName, byte[][] families, int numVersions) throws IOException { HTableDescriptor desc = new HTableDescriptor(tableName); - for (byte [] family : families) { + for (byte[] family : families) { HColumnDescriptor hcd = new HColumnDescriptor(family, numVersions, HColumnDescriptor.DEFAULT_COMPRESSION, HColumnDescriptor.DEFAULT_IN_MEMORY, @@ -267,12 +283,12 @@ * @return An HTable instance for the created table. * @throws IOException */ - public HTable createTable(byte [] tableName, byte [][] families, - int [] numVersions) + public HTable createTable(byte[] tableName, byte[][] families, + int[] numVersions) throws IOException { HTableDescriptor desc = new HTableDescriptor(tableName); int i = 0; - for (byte [] family : families) { + for (byte[] family : families) { HColumnDescriptor hcd = new HColumnDescriptor(family, numVersions[i], HColumnDescriptor.DEFAULT_COMPRESSION, HColumnDescriptor.DEFAULT_IN_MEMORY, @@ -292,17 +308,17 @@ * @return Count of rows loaded. * @throws IOException */ - public int loadTable(final HTable t, final byte [] f) throws IOException { + public int loadTable(final HTable t, final byte[] f) throws IOException { byte[] k = new byte[3]; int rowCount = 0; - for (byte b1 = 'a'; b1 < 'z'; b1++) { - for (byte b2 = 'a'; b2 < 'z'; b2++) { - for (byte b3 = 'a'; b3 < 'z'; b3++) { + for (byte b1 = 'a'; b1 <= 'z'; b1++) { + for (byte b2 = 'a'; b2 <= 'z'; b2++) { + for (byte b3 = 'a'; b3 <= 'z'; b3++) { k[0] = b1; k[1] = b2; k[2] = b3; Put put = new Put(k); - put.add(f, new byte[0], k); + put.add(f, null, k); t.put(put); rowCount++; } @@ -310,4 +326,149 @@ } return rowCount; } + + /** + * Creates many regions names "aaa" to "zzz". + * + * @param table The table to use for the data. + * @param columnFamily The family to insert the data into. + * @throws IOException When creating the regions fails. + */ + public void createMultiRegions(HTable table, byte[] columnFamily) + throws IOException { + byte[][] KEYS = { + HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"), + Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), + Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), + Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"), + Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), + Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), + Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), + Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"), + Bytes.toBytes("xxx"), Bytes.toBytes("yyy") + }; + + HBaseConfiguration c = getConfiguration(); + HTable meta = new HTable(c, HConstants.META_TABLE_NAME); + HTableDescriptor htd = table.getTableDescriptor(); + if(!htd.hasFamily(columnFamily)) { + HColumnDescriptor hcd = new HColumnDescriptor(columnFamily); + htd.addFamily(hcd); + } + // remove empty region - this is tricky as the mini cluster during the test + // setup already has the ",,123456789" row with an empty start + // and end key. Adding the custom regions below adds those blindly, + // including the new start region from empty to "bbb". lg + List rows = getMetaTableRows(); + // add custom ones + for (int i = 0; i < KEYS.length; i++) { + int j = (i + 1) % KEYS.length; + HRegionInfo hri = new HRegionInfo(table.getTableDescriptor(), + KEYS[i], KEYS[j]); + Put put = new Put(hri.getRegionName()); + put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, + Writables.getBytes(hri)); + meta.put(put); + LOG.info("createMultiRegions: inserted " + hri.toString()); + } + // see comment above, remove "old" (or previous) single region + for (byte[] row : rows) { + LOG.info("createMultiRegions: deleting meta row -> " + + Bytes.toStringBinary(row)); + meta.delete(new Delete(row)); + } + // flush cache of regions + HBaseAdmin admin = new HBaseAdmin(getConfiguration()); + HConnection conn = admin.getConnection(); + conn.clearRegionCache(); + } + + /** + * Returns all rows from the .META. table. + * + * @throws IOException When reading the rows fails. + */ + public List getMetaTableRows() throws IOException { + HTable t = new HTable(this.conf, HConstants.META_TABLE_NAME); + List rows = new ArrayList(); + ResultScanner s = t.getScanner(new Scan()); + for (Result result : s) { + LOG.info("getMetaTableRows: row -> " + + Bytes.toStringBinary(result.getRow())); + rows.add(result.getRow()); + } + s.close(); + return rows; + } + + /** + * Removes all rows from the .META. in preparation to add custom ones. + * + * @throws IOException When removing the rows fails. + */ + private void emptyMetaTable() throws IOException { + HTable t = new HTable(this.conf, HConstants.META_TABLE_NAME); + ArrayList deletes = new ArrayList(); + ResultScanner s = t.getScanner(new Scan()); + for (Result result : s) { + LOG.info("emptyMetaTable: remove row -> " + + Bytes.toStringBinary(result.getRow())); + Delete del = new Delete(result.getRow()); + deletes.add(del); + } + s.close(); + t.delete(deletes); + } + + /** + * Starts a MiniMRCluster with a default number of + * TaskTracker's. + * + * @throws IOException When starting the cluster fails. + */ + public void startMiniMapReduceCluster() throws IOException { + startMiniMapReduceCluster(2); + } + + /** + * Starts a MiniMRCluster. + * + * @param servers The number of TaskTracker's to start. + * @throws IOException When starting the cluster fails. + */ + public void startMiniMapReduceCluster(final int servers) throws IOException { + LOG.info("Starting mini mapreduce cluster..."); + // These are needed for the new and improved Map/Reduce framework + Configuration c = getConfiguration(); + System.setProperty("hadoop.log.dir", c.get("hadoop.log.dir")); + c.set("mapred.output.dir", c.get("hadoop.tmp.dir")); + mrCluster = new MiniMRCluster(servers, + FileSystem.get(c).getUri().toString(), 1); + LOG.info("Mini mapreduce cluster started"); + } + + /** + * Stops the previously started MiniMRCluster. + */ + public void shutdownMiniMapReduceCluster() { + LOG.info("Stopping mini mapreduce cluster..."); + if (mrCluster != null) { + mrCluster.shutdown(); + } + LOG.info("Mini mapreduce cluster stopped"); + } + + /** + * Switches the logger for the given class to DEBUG level. + * + * @param clazz The class for which to switch to debug logging. + */ + public void enableDebug(Class clazz) { + Log l = LogFactory.getLog(clazz); + if (l instanceof Log4JLogger) { + ((Log4JLogger) l).getLogger().setLevel(org.apache.log4j.Level.DEBUG); + } else if (l instanceof Jdk14Logger) { + ((Jdk14Logger) l).getLogger().setLevel(java.util.logging.Level.ALL); + } + } } \ No newline at end of file Index: src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java =================================================================== --- src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java (revision 833436) +++ src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java (working copy) @@ -20,17 +20,18 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; @@ -269,29 +270,40 @@ */ @Override public List getSplits(JobContext context) throws IOException { + Pair keys = table.getStartEndKeys(); + if (keys == null || keys.getFirst() == null || + keys.getFirst().length == 0) { + throw new IOException("Expecting at least one region."); + } if (table == null) { throw new IOException("No table was provided."); } - byte [][] startKeys = table.getStartKeys(); - if (startKeys == null || startKeys.length == 0) { - throw new IOException("Expecting at least one region."); + int count = 0; + List splits = new ArrayList(keys.getFirst().length); + for (int i = 0; i < keys.getFirst().length; i++) { + String regionLocation = table.getRegionLocation(keys.getFirst()[i]). + getServerAddress().getHostname(); + byte[] startRow = scan.getStartRow(); + byte[] stopRow = scan.getStopRow(); + // determine if the given start an stop key fall into the region + if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || + Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && + (stopRow.length == 0 || + Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { + byte[] splitStart = startRow.length == 0 || + Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? + keys.getFirst()[i] : startRow; + byte[] splitStop = stopRow.length == 0 || + Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0 ? + keys.getSecond()[i] : stopRow; + InputSplit split = new TableSplit(table.getTableName(), + splitStart, splitStop, regionLocation); + splits.add(split); + if (LOG.isDebugEnabled()) + LOG.debug("getSplits: split -> " + (count++) + " -> " + split); + } } - int realNumSplits = startKeys.length; - InputSplit[] splits = new InputSplit[realNumSplits]; - int middle = startKeys.length / realNumSplits; - int startPos = 0; - for (int i = 0; i < realNumSplits; i++) { - int lastPos = startPos + middle; - lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos; - String regionLocation = table.getRegionLocation(startKeys[startPos]). - getServerAddress().getHostname(); - splits[i] = new TableSplit(this.table.getTableName(), - startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]: - HConstants.EMPTY_START_ROW, regionLocation); - LOG.info("split: " + i + "->" + splits[i]); - startPos = lastPos; - } - return Arrays.asList(splits); + return splits; } /** Index: src/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 833436) +++ src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -836,6 +836,13 @@ return result; } + /** + * Allows flushing the region cache. + */ + public void clearRegionCache() { + cachedRegionLocations.clear(); + } + /* * Put a newly discovered HRegionLocation into the cache. */ Index: src/java/org/apache/hadoop/hbase/client/HConnection.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/HConnection.java (revision 833436) +++ src/java/org/apache/hadoop/hbase/client/HConnection.java (working copy) @@ -111,6 +111,11 @@ throws IOException; /** + * Allows flushing the region cache. + */ + public void clearRegionCache(); + + /** * Find the location of the region of tableName that row * lives in, ignoring any value that might be in the cache. * @param tableName name of the table row is in