Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 74) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy) @@ -41,6 +41,7 @@ private Thread masterThread; List regionServers; List regionThreads; + private boolean deleteOnExit = true; /** * Starts a MiniHBaseCluster on top of a new MiniDFSCluster @@ -51,10 +52,21 @@ */ public MiniHBaseCluster(Configuration conf, int nRegionNodes) throws IOException { - this(conf, nRegionNodes, true); + this(conf, nRegionNodes, true, true, true); } /** + * + * @param conf + * @param nRegionNodes + * @param miniHdfsFilesystem + * @throws IOException + */ + public MiniHBaseCluster(Configuration conf, int nRegionNodes, final boolean miniHdfsFilesystem) throws IOException { + this(conf, nRegionNodes, miniHdfsFilesystem, true, true); + } + + /** * Starts a MiniHBaseCluster on top of an existing HDFSCluster * * @param conf @@ -78,53 +90,59 @@ * @param miniHdfsFilesystem If true, set the hbase mini * cluster atop a mini hdfs cluster. Otherwise, use the * filesystem configured in conf. + * @param format the mini hdfs cluster + * @param deleteOnExit clean up mini hdfs files * @throws IOException */ public MiniHBaseCluster(Configuration conf, int nRegionNodes, - final boolean miniHdfsFilesystem) + final boolean miniHdfsFilesystem, boolean format, boolean deleteOnExit) throws IOException { this.conf = conf; + this.deleteOnExit = deleteOnExit; if (miniHdfsFilesystem) { - try { - this.cluster = new MiniDFSCluster(this.conf, 2, true, (String[])null); - } catch(Throwable t) { - LOG.error("Failed setup of mini dfs cluster", t); - t.printStackTrace(); - return; + try { + this.cluster = new MiniDFSCluster(this.conf, 2, format, (String[])null); + + } catch(Throwable t) { + LOG.error("Failed setup of mini dfs cluster", t); + t.printStackTrace(); + return; + } } - } - init(nRegionNodes); + init(nRegionNodes); } - + private void init(int nRegionNodes) throws IOException { - try { - try { - this.fs = FileSystem.get(conf); - this.parentdir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR)); - fs.mkdirs(parentdir); + try { + try { + this.fs = FileSystem.get(conf); + this.parentdir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR)); + fs.mkdirs(parentdir); } catch(IOException e) { LOG.error("Failed setup of FileSystem", e); throw e; } - if(this.conf.get(MASTER_ADDRESS) == null) { - this.conf.set(MASTER_ADDRESS, "localhost:0"); - } - - // Create the master - this.master = new HMaster(conf); - this.masterThread = new Thread(this.master, "HMaster"); + if(this.conf.get(MASTER_ADDRESS) == null) { + this.conf.set(MASTER_ADDRESS, "localhost:0"); + } - // Start up the master - LOG.info("Starting HMaster"); - masterThread.start(); - - // Set the master's port for the HRegionServers - String address = master.getMasterAddress().toString(); - this.conf.set(MASTER_ADDRESS, address); + // Create the master + this.master = new HMaster(conf); + this.masterThread = new Thread(this.master, "HMaster"); + // Start up the master + LOG.info("Starting HMaster"); + masterThread.start(); + + // Set the master's port for the HRegionServers + String address = master.getMasterAddress().toString(); + this.conf.set(MASTER_ADDRESS, address); + + // Start the HRegionServers + // Start the HRegionServers. Always have regionservers come up on // port '0' so there won't be clashes over default port as unit tests // start/stop ports at different times during the life of the test. @@ -137,6 +155,15 @@ } } + /** + * Get the cluster on which this HBase cluster is running + * + * @return + */ + public MiniDFSCluster getDFSCluster() { + return cluster; + } + private void startRegionServers(final int nRegionNodes) throws IOException { this.regionServers = new ArrayList(nRegionNodes); @@ -145,7 +172,7 @@ startRegionServer(); } } - + void startRegionServer() throws IOException { HRegionServer hsr = new HRegionServer(this.conf); this.regionServers.add(hsr); @@ -159,9 +186,9 @@ * the supplied port is not necessarily the actual port used. */ public HServerAddress getHMasterAddress() { - return master.getMasterAddress(); + return master.getMasterAddress(); } - + /** * Shut down the specified region server cleanly * @@ -174,7 +201,7 @@ } this.regionServers.get(serverNumber).stop(); } - + public void waitOnRegionServer(int serverNumber) { if (serverNumber >= regionServers.size()) { throw new ArrayIndexOutOfBoundsException( @@ -199,7 +226,7 @@ } this.regionServers.get(serverNumber).abort(); } - + /** Shut down the HBase cluster */ public void shutdown() { LOG.info("Shutting down the HBase Cluster"); @@ -218,8 +245,8 @@ } try { masterThread.join(); + } catch(InterruptedException e) { - // continue } LOG.info("HBase Cluster shutdown complete"); @@ -229,8 +256,9 @@ } // Delete all DFS files - deleteFile(new File(System.getProperty( - StaticTestEnvironment.TEST_DIRECTORY_KEY), "dfs")); + if(deleteOnExit) + deleteFile(new File(System.getProperty( + StaticTestEnvironment.TEST_DIRECTORY_KEY), "dfs")); } private void deleteFile(File f) { @@ -242,4 +270,4 @@ } f.delete(); } -} \ No newline at end of file +} Index: src/contrib/hbase/src/java/org/apache/hadoop/mapred/TableOutputCollector.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/mapred/TableOutputCollector.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/mapred/TableOutputCollector.java (revision 70) @@ -0,0 +1,26 @@ +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.io.RecordWritable; +import org.apache.hadoop.io.Text; + +/** + * Refine the types that can be collected from a Table Map/Reduce jobs. + */ +public class TableOutputCollector { + public OutputCollector collector; + + /** + * Restrict Table Map/Reduce's output to be a Text key and a record. + * + * @param key + * @param value + * @throws IOException + */ + public void collect(Text key, RecordWritable value) throws IOException + { + collector.collect(key, value); + } + +} Index: src/contrib/hbase/src/java/org/apache/hadoop/mapred/TableReduce.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/mapred/TableReduce.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/mapred/TableReduce.java (revision 70) @@ -0,0 +1,66 @@ +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +/** + * Write a table, sorting by the input key + * + */ +public abstract class TableReduce extends MapReduceBase implements Reducer { + + private static final Logger LOG = + Logger.getLogger(TableReduce.class.getName()); + TableOutputCollector m_collector; + + public TableReduce() { + m_collector = new TableOutputCollector(); + } + + /** + * Use this before submitting a TableReduce job. It will + * appropriately set up the JobConf. + * + * @param table + * @param reducer + * @param job + */ + public static void initJob(String table, Class reducer, JobConf job) { + job.setOutputFormat(TableOutputFormat.class); + job.setReducerClass(reducer); + job.set(TableOutputFormat.OUTPUT_TABLE, table); + } + + /** + * Create a unique key for table insertion by appending a local + * counter the given key. + */ + public void reduce(WritableComparable key, Iterator values, + OutputCollector output, Reporter reporter) throws IOException { + LOG.info("start reduce"); + if(m_collector.collector == null) m_collector.collector=output; + reduce((Text)key, values, m_collector, reporter); + LOG.info("end reduce"); + } + + /** + * + * @param key + * @param values + * @param output + * @param reporter + * @throws IOException + */ + public abstract void reduce(Text key, Iterator values, + TableOutputCollector output, Reporter reporter) throws IOException; + +} Index: src/contrib/hbase/src/java/org/apache/hadoop/mapred/IdentityTableMap.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/mapred/IdentityTableMap.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/mapred/IdentityTableMap.java (revision 70) @@ -0,0 +1,29 @@ +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.io.RecordWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.Reporter; + + +/** + * Pass the given key and record as-is to reduce + * + */ +public class IdentityTableMap extends TableMap { + + public IdentityTableMap() { super(); } + + /** + * Pass the key, value to reduce + */ + @Override + public void map(HStoreKey key, RecordWritable value, + TableOutputCollector output, Reporter reporter) throws IOException { + Text tKey = key.getRow(); + output.collect(tKey, value); + } + +} Index: src/contrib/hbase/src/java/org/apache/hadoop/mapred/TableInputFormat.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/mapred/TableInputFormat.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/mapred/TableInputFormat.java (revision 70) @@ -0,0 +1,205 @@ +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.TreeMap; +import java.util.ArrayList; +import java.util.Iterator; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.RecordWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HClient; +import org.apache.hadoop.hbase.HScannerInterface; +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.hbase.KeyedData; +import org.apache.log4j.Logger; + +/** + * Convert HBase tabular data into a format that is consumable by Map/Reduce + */ +public class TableInputFormat implements InputFormat, JobConfigurable { + + private static final Logger LOG = + Logger.getLogger(TableInputFormat.class.getName()); + + // space delimited list of columns + // @see org.apache.hadoop.hbase.HAbstractScanner for column name wildcards + public static final String COLUMN_LIST = "TABLE_COLUMNS"; + private Text m_tableName; + private Text[] m_cols; + private HClient m_client; + + /** + * Iterate over an HBase table data, return (HStoreKey, RecordWritable) pairs + * + * @author Vuk + * + */ + class TableRecordReader implements RecordReader + { + private HScannerInterface m_scanner; + private TreeMap m_row; // current buffer + private Text m_endRow; + + /** + * + * @param startRow (inclusive) + * @param endRow (exclusive) + * @throws IOException + */ + public TableRecordReader(Text startRow, Text endRow) + throws IOException + { + LOG.info("start construct"); + m_row = new TreeMap(); + if(startRow == null) + m_scanner = m_client.obtainScanner(m_cols, new Text()); + else + m_scanner = m_client.obtainScanner(m_cols, startRow); + m_endRow = endRow; + LOG.info("end construct"); + } + + public void close() throws IOException { + LOG.info("start close"); + m_scanner.close(); + LOG.info("end close"); + } + + /** + * HStoreKey + */ + public WritableComparable createKey() { + return new HStoreKey(); + } + + /** + * RecordWritable of KeyedData + */ + public Writable createValue() { + return new RecordWritable(); + } + + public long getPos() throws IOException { + // This should be the ordinal tuple in the range; + // not clear how to calculate... + return 0; + } + + public float getProgress() throws IOException { + // Depends on the total number of tuples and getPos + return 0; + } + + /** + * Expecting HStoreKey as input key. + * Expecting RecordWritable as input value + * + * Converts HClientScanner (HStoreKey, TreeMap(Text, BytesWritable)) to + * (HStoreKey, RecordWritable) + */ + public boolean next(Writable key, Writable value) throws IOException { + LOG.info("start next"); + m_row.clear(); + HStoreKey tKey = (HStoreKey)key; + boolean hasMore = m_scanner.next(tKey, m_row); + + if(hasMore) { + if(m_endRow != null && (tKey.getRow().compareTo(m_endRow) < 0)) { + hasMore = false; + } else { + RecordWritable rowVal = (RecordWritable) value; + ArrayList columns = new ArrayList(); + + // iterate over all column, value pairs from HClient + Iterator colIter = m_row.keySet().iterator(); + while(colIter.hasNext()) { + Text col = colIter.next(); + byte[] val = m_row.get(col); + HStoreKey keyCol = new HStoreKey(); + keyCol.setColumn(col); + columns.add(new KeyedData(keyCol, val)); + } + + // set the output + rowVal.set(columns.toArray(new KeyedData[columns.size()])); + } + } + LOG.info("end next"); + return hasMore; + } + + } + + public RecordReader getRecordReader(InputSplit split, JobConf job, + Reporter reporter) throws IOException { + TableSplit tSplit = (TableSplit)split; + return new TableRecordReader(tSplit.getStartRow(), + tSplit.getEndRow()); + } + + /** + * A split will be created for each HRegion of the input table + */ + public InputSplit[] getSplits(JobConf job, int numSplits) + throws IOException { + LOG.info("start getSplits"); + + Text[] startKeys = m_client.getStartKeys(m_tableName); + if(startKeys == null || startKeys.length == 0) + throw new IOException("Expecting at least one tablet server"); + InputSplit[] splits = new InputSplit[startKeys.length]; + for(int i = 0; i < startKeys.length; i++) { + Text start = null; + Text end = null; + if(i > 0) { + start = startKeys[i]; + } + if( (i+1) < startKeys.length) { + end = startKeys[i+1]; + } + splits[i] = new TableSplit(m_tableName, start, end); + LOG.info("split: " + i + "->" + splits[i]); + } + LOG.info("end splits"); + return splits; + } + + public void configure(JobConf job) + { + LOG.info("start configure"); + Path[] tableNames = job.getInputPaths(); + m_tableName = new Text(tableNames[0].getName()); + String colArg = job.get(COLUMN_LIST); + String[] colNames = colArg.split(" "); + m_cols = new Text[colNames.length]; + for(int i = 0; i < m_cols.length; i++) { + m_cols[i] = new Text(colNames[i]); + } + HBaseConfiguration hbaseConf = new HBaseConfiguration(); + m_client = new HClient(hbaseConf/**job*/); + try { + m_client.enableTable(m_tableName); + m_client.openTable(m_tableName); + } catch(Exception e) {e.printStackTrace();} + LOG.info("end configure"); + } + + public void validateInput(JobConf job) throws IOException { + + // expecting exactly one path + Path[] tableNames = job.getInputPaths(); + if(tableNames == null || tableNames.length > 1) + throw new IOException("expecting one table name"); + + // expecting at least one column + String colArg = job.get(COLUMN_LIST); + if(colArg == null && colArg.length() == 0) + throw new IOException("expecting at least one column"); + } + +} Index: src/contrib/hbase/src/java/org/apache/hadoop/mapred/GroupingTableMap.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/mapred/GroupingTableMap.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/mapred/GroupingTableMap.java (revision 70) @@ -0,0 +1,114 @@ +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.hbase.KeyedData; +import org.apache.hadoop.io.RecordWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; + + +/** + * Extract grouping columns from input record + * + */ +public class GroupingTableMap extends TableMap { + + public static final String GROUP_COLUMNS = "groupingtablemap.columns"; + private Text[] m_columns; + + public GroupingTableMap() { super(); } + + /** + * Use this before submitting a TableMap job. It will + * appropriately set up the JobConf. + */ + public static void initJob(String table, String columns, String groupColumns, + Class mapper, JobConf job) { + initJob(table, columns, mapper, job); + job.set(GROUP_COLUMNS, groupColumns); + } + + @Override + public void configure(JobConf job) { + super.configure(job); + String[] cols = job.get(GROUP_COLUMNS, "").split(" "); + m_columns = new Text[cols.length]; + for(int i = 0; i < cols.length; i++) { + m_columns[i] = new Text(cols[i]); + } + } + + /** + * Extract the grouping columns from value to construct a new key. + * Pass the new key and value to reduce. + * If any of the grouping columns are not found in the value, the record is skipped. + */ + @Override + public void map(HStoreKey key, RecordWritable value, + TableOutputCollector output, Reporter reporter) throws IOException { + BytesWritable[] keyVals = extractKeyValues(value); + if(keyVals != null) { + Text tKey = createGroupKey(keyVals); + output.collect(tKey, value); + } + } + + /** + * Extract columns values from the current record. This method returns + * null if any of the columns are not found. + * Override this method if you want to deal with nulls differently. + * + * @param r + * @return + */ + protected BytesWritable[] extractKeyValues(RecordWritable r) + { + BytesWritable[] keyVals = null; + ArrayList foundList = new ArrayList(); + int numCols = m_columns.length; + if(numCols > 0) { + KeyedData[] recVals = r.get(); + for(int i = 0; i < numCols; i++) { + boolean found = false; + for(int j = 0; j < recVals.length; j++) { + if(recVals[j].getKey().getColumn().equals(m_columns[i])) { + found = true; + BytesWritable val = new BytesWritable(recVals[j].getData()); + foundList.add(val); + break; + } + } + if(!found) + break; + } + if(foundList.size() == numCols) { + keyVals = foundList.toArray(new BytesWritable[numCols]); + } + } + return keyVals; + } + + /** + * Create a key by concatenating multiple column values. + * Override this function in order to produce different types of keys. + * + * @param vals + * @return + */ + protected Text createGroupKey(BytesWritable[] vals) + { + if(vals == null) return null; + StringBuilder sb = new StringBuilder(); + for(int i = 0; i < vals.length; i++) { + sb.append(new String(vals[i].get())); + if( (i + 1) < vals.length) sb.append(" "); + } + return new Text(sb.toString()); + } + +} Index: src/contrib/hbase/src/java/org/apache/hadoop/mapred/TableOutputFormat.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/mapred/TableOutputFormat.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/mapred/TableOutputFormat.java (revision 70) @@ -0,0 +1,102 @@ +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.RecordWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HClient; +import org.apache.hadoop.hbase.KeyedData; +import org.apache.log4j.Logger; + +/** + * Convert Map/Reduce output and write it to an HBase table + */ +public class TableOutputFormat extends OutputFormatBase { + + public static final String OUTPUT_TABLE = "hbaseTableOutput"; + + private static final Logger LOG = + Logger.getLogger(TableOutputFormat.class.getName()); + + public TableOutputFormat() { + + } + + /** + * Convert Reduce output (key, value) to (HStoreKey, RecordWritable) + * and write to an HBase table + * + * @author Vuk + * + */ + protected class TableRecordWriter implements RecordWriter + { + private HClient m_client; + + /** + * Instantiate a TableRecordWriter with the HBase HClient for writing. + * + * @param client + */ + public TableRecordWriter(HClient client) + { + m_client = client; + } + + public void close(Reporter reporter) throws IOException { + + } + + /** + * Expect key to be of type Text + * Expect value to be of type RecordWritable + */ + public void write(WritableComparable key, Writable value) throws IOException { + LOG.info("start write"); + Text tKey = (Text)key; + RecordWritable tValue = (RecordWritable) value; + KeyedData[] columns = tValue.get(); + + // start transaction + long xid = m_client.startUpdate(tKey); + for(int i = 0; i < columns.length; i++) { + KeyedData column = columns[i]; + m_client.put(xid, column.getKey().getColumn(), column.getData()); + } + // end transaction + m_client.commit(xid); + + LOG.info("end write"); + } + } + @Override + public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, + String name, Progressable progress) throws IOException { + // expecting exactly one path + LOG.info("start get writer"); + Text tableName = new Text(job.get(OUTPUT_TABLE)); + HClient client = null; + try { + HBaseConfiguration hbConf = new HBaseConfiguration(); + client = new HClient(hbConf/**job*/); + client.enableTable(tableName); + client.openTable(tableName); + } catch(Exception e) {e.printStackTrace();} + LOG.info("end get writer"); + return new TableRecordWriter(client); + } + + @Override + public void checkOutputSpecs(FileSystem ignored, JobConf job) throws FileAlreadyExistsException, InvalidJobConfException, IOException { + // TODO Auto-generated method stub + String tableName = job.get(OUTPUT_TABLE); + if(tableName == null) + throw new IOException("Must specify table name"); + } + +} Index: src/contrib/hbase/src/java/org/apache/hadoop/mapred/TableSplit.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/mapred/TableSplit.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/mapred/TableSplit.java (revision 70) @@ -0,0 +1,125 @@ +package org.apache.hadoop.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Text; + +/** + * A table split corresponds to a key range [low, high) + */ +public class TableSplit implements InputSplit { + + private static final int LEFT_OPEN = 0; + private static final int CLOSED = 1; + private static final int RIGHT_OPEN = 2; + private static final int OPEN = 3; + + private Text m_tableName; + private Text m_startRow; + private Text m_endRow; + + public TableSplit() { } + + /** + * + * @param tableName + * @param startRow + * @param endRow + * @throws IOException + */ + public TableSplit(Text tableName, Text startRow, Text endRow) + throws IOException + { + if(tableName == null) throw new IOException("Expected non-null table name"); + m_tableName = tableName; + m_startRow = startRow; + m_endRow = endRow; + } + + /** + * + * @return + */ + public Text getTableName() + { + return m_tableName; + } + + /** + * + * @return + */ + public Text getStartRow() + { + return m_startRow; + } + + /** + * @return + */ + public Text getEndRow() + { + return m_endRow; + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.InputSplit#getLength() + */ + public long getLength() throws IOException { + // Not clear how to obtain this... seems to be used only for sorting splits + return 0; + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.InputSplit#getLocations() + */ + public String[] getLocations() throws IOException { + // Return a random node from the cluster for now + return new String[] { }; + } + + public void readFields(DataInput in) throws IOException { + m_tableName = new Text(); + m_tableName.readFields(in); + int rangeType = in.readInt(); + if(rangeType == LEFT_OPEN) { + m_startRow = null; + m_endRow = new Text(); + m_endRow.readFields(in); + } else if(rangeType == RIGHT_OPEN) { + m_startRow = new Text(); + m_startRow.readFields(in); + m_endRow = null; + } else if(rangeType == CLOSED){ + m_startRow = new Text(); + m_endRow = new Text(); + m_startRow.readFields(in); + m_endRow.readFields(in); + } else if(rangeType != OPEN){ + throw new IOException("Invalid range type: " + rangeType); + } + } + + public void write(DataOutput out) throws IOException { + m_tableName.write(out); + if(m_startRow == null && m_endRow == null) + out.writeInt(OPEN); + if(m_startRow == null) + out.writeInt(LEFT_OPEN); + else if(m_endRow == null) + out.writeInt(RIGHT_OPEN); + else + out.writeInt(CLOSED); + if(m_startRow != null) + m_startRow.write(out); + if(m_endRow != null) + m_endRow.write(out); + } + + public String toString() + { + return m_tableName +"," + m_startRow + "," + m_endRow; + } +} Index: src/contrib/hbase/src/java/org/apache/hadoop/mapred/TableMap.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/mapred/TableMap.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/mapred/TableMap.java (revision 70) @@ -0,0 +1,87 @@ +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.io.RecordWritable; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +/** + * Scan an HBase table to sort by a specified sort column. + * If the column does not exist, the record is not passed to Reduce. + * + */ +public abstract class TableMap extends MapReduceBase implements Mapper { + + private static final Logger LOG = + Logger.getLogger(TableMap.class.getName()); + + private TableOutputCollector m_collector; + + public TableMap() { + m_collector = new TableOutputCollector(); + } + + /** + * Use this before submitting a TableMap job. It will + * appropriately set up the JobConf. + * + * @param table + * @param columns + * @param mapper + * @param job + */ + public static void initJob(String table, String columns, + Class mapper, JobConf job) { + job.setInputFormat(TableInputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(RecordWritable.class); + job.setMapperClass(mapper); + job.setInputPath(new Path(table)); + job.set(TableInputFormat.COLUMN_LIST, columns); + } + + @Override + public void configure(JobConf job) { + super.configure(job); + } + + /** + * Input: + * The key is of type HStoreKey + * The value is of type RecordWritable + * + * Output: + * The key is a specific column, including the input key or any value + * The value is of type LabeledData + */ + public void map(WritableComparable key, Writable value, + OutputCollector output, Reporter reporter) throws IOException { + LOG.info("start map"); + if(m_collector.collector == null) m_collector.collector=output; + map( (HStoreKey)key, (RecordWritable)value, m_collector, reporter); + LOG.info("end map"); + } + + /** + * Call a user defined function on a single HBase record, represented + * by a key and its associated record value. + * + * @param key + * @param value + * @param output + * @param reporter + * @throws IOException + */ + public abstract void map(HStoreKey key, RecordWritable value, + TableOutputCollector output, Reporter reporter) throws IOException; +} Index: src/contrib/hbase/src/java/org/apache/hadoop/mapred/IdentityTableReduce.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/mapred/IdentityTableReduce.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/mapred/IdentityTableReduce.java (revision 70) @@ -0,0 +1,30 @@ +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.io.RecordWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.Reporter; + + +/** + * Write to table each key, record pair + */ +public class IdentityTableReduce extends TableReduce { + + public IdentityTableReduce() { super(); } + + /** + * No aggregation, output pairs of (key, record) + */ + @Override + public void reduce(Text key, Iterator values, + TableOutputCollector output, Reporter reporter) throws IOException { + while(values.hasNext()) { + RecordWritable r = (RecordWritable)values.next(); + output.collect(key, r); + } + } + +} Index: src/contrib/hbase/src/java/org/apache/hadoop/mapred/TableJobExample.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/mapred/TableJobExample.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/mapred/TableJobExample.java (revision 72) @@ -0,0 +1,84 @@ +package org.apache.hadoop.mapred; + +import java.util.HashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.ClusterStatus; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; + +/** + * Example Map/Reduce job over HBase tables + * + */ +public class TableJobExample { + + static String USAGE = "java com.ibm.impliance.main.TableJobExample" + + "\n\t -src " + + "\n\t [-sort ]" + + "\n\t -proj " + + "\n\t -tgt "; + + static void printUsage(String msg) + { + System.err.println(msg); + System.err.println(USAGE); + System.exit(0); + } + + static HashMap parseArgs(String[] args) + { + HashMap map = new HashMap(); + for(int i = 0; i < args.length; i++) { + if( ("-src".equals(args[i])) || + ("-tgt".equals(args[i])) || + ("-proj".equals(args[i])) || + ("-sort".equals(args[i])) ) { + map.put(args[i], args[++i]); + } else { + printUsage("Unsupported argument: " + args[i]); + } + } + return map; + } + + /** + * @param args + */ + public static void main(String[] args) + throws Exception + { + if( (args.length %2) != 0 ) printUsage("Incorrect number of arguments"); + HashMap argMap = parseArgs(args); + + String srcTable = argMap.get("-src"); + String tgtTable = argMap.get("-tgt"); + String proj = argMap.get("-proj"); + String sort = argMap.get("-sort"); + + if(srcTable == null || tgtTable == null || proj == null) printUsage("Required argument not provided"); + + Configuration defaults = new Configuration(); + + JobConf jobConf = new JobConf(defaults, TableJobExample.class); + jobConf.setJobName("table sorter"); + + if(sort != null) + GroupingTableMap.initJob(srcTable, proj, sort, GroupingTableMap.class, jobConf); + else + IdentityTableMap.initJob(srcTable, proj, IdentityTableMap.class, jobConf); + IdentityTableReduce.initJob(tgtTable, IdentityTableReduce.class, jobConf); + + JobClient client = new JobClient(jobConf); + + ClusterStatus cluster = client.getClusterStatus(); + int num_maps = cluster.getMapTasks(); + int num_reduces = 1; + + jobConf.setNumMapTasks(num_maps); + jobConf.setNumReduceTasks(num_reduces); + + JobClient.runJob(jobConf); + } + +} Index: src/contrib/hbase/src/java/org/apache/hadoop/io/RecordWritable.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/io/RecordWritable.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/io/RecordWritable.java (revision 70) @@ -0,0 +1,69 @@ +package org.apache.hadoop.io; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hbase.KeyedData; + +/** + * Wraps an array of KeyedData items as a Writable. The array elements + * may be null. + * + * @author Vuk + * + */ +public class RecordWritable implements Writable { + + private final static KeyedData NULL_KEYEDDATA = new KeyedData(); + + private KeyedData[] m_data; + + /** + * Make a record of length 0 + * + */ + public RecordWritable() { + m_data = new KeyedData[0]; + } + + /** + * + * @return + */ + public KeyedData[] get() + { + return m_data; + } + + /** + * + * @param data + */ + public void set(KeyedData[] data) + { + if(data != null) + m_data = data; + } + + public void readFields(DataInput in) throws IOException { + int len = in.readInt(); + m_data = new KeyedData[len]; + for(int i = 0; i < len; i++) { + m_data[i] = new KeyedData(); + m_data[i].readFields(in); + } + } + + public void write(DataOutput out) throws IOException { + int len = m_data.length; + out.writeInt(len); + for(int i = 0; i < len; i++) { + if(m_data[i] != null) + m_data[i].write(out); + else + NULL_KEYEDDATA.write(out); + } + } + +} Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (revision 74) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (working copy) @@ -1195,7 +1195,7 @@ s.close(); leases.cancelLease(scannerId, scannerId); } - + private static void printUsageAndExit() { printUsageAndExit(null); } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (revision 74) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (working copy) @@ -17,6 +17,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -967,6 +968,21 @@ return this.tableServers.get(serverKey); } + public synchronized Text[] getStartKeys(Text tableName) throws IOException { + openTable(tableName); + Text[] keys = new Text[tableServers.size()]; + int i = 0; + for(Iterator iter = tableServers.keySet().iterator(); iter.hasNext();) { + if(i < keys.length) { + keys[i++] = iter.next(); + } else { + break; + } + + } + return keys; + } + /* * Clear caches of passed region location, reload servers for the passed * region's table and then ensure region location can be found. Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (revision 74) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (working copy) @@ -184,7 +184,8 @@ HRegionInfo info = HRegion.getRegionInfo(results); String serverName = HRegion.getServerName(results); long startCode = HRegion.getStartCode(results); - + LOG.info("row: " + info.toString() + ", server: " + serverName + + ", startCode: " + startCode); if(LOG.isDebugEnabled()) { LOG.debug(Thread.currentThread().getName() + " scanner: " + Long.valueOf(scannerId) + " regioninfo: {" + info.toString() + Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (revision 74) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (working copy) @@ -355,7 +355,7 @@ // By default we split region if a file > DEFAULT_MAX_FILE_SIZE. this.desiredMaxFileSize = conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE); - + // HRegion is ready to go! this.writestate.writesOngoing = false; LOG.info("region " + this.regionInfo.regionName + " available"); @@ -615,7 +615,7 @@ midKey.set(key); } } - + LOG.info("region split max: " + maxSize); return (maxSize > (this.desiredMaxFileSize + (this.desiredMaxFileSize / 2))); } finally { @@ -722,6 +722,7 @@ if (LOG.isDebugEnabled()) { LOG.debug("Flushing cache. Number of commits is: " + commitsSinceFlush); } + LOG.info("SPLIT Flushing cache. Number of commits is: " + commitsSinceFlush); flushcache(false); } } @@ -1410,6 +1411,7 @@ } } } + return insertedItem; } Index: src/contrib/hbase/bin/hbase =================================================================== --- src/contrib/hbase/bin/hbase (revision 74) +++ src/contrib/hbase/bin/hbase (working copy) @@ -82,10 +82,10 @@ # CLASSPATH initially contains $HBASE_CONF_DIR # Add HADOOP_CONF_DIR if its been defined. -CLASSPATH="${HBASE_CONF_DIR}" if [ ! "$HADOOP_CONF_DIR" = "" ]; then CLASSPATH="${CLASSPATH}:${HADOOP_CONF_DIR}" fi +CLASSPATH="${CLASSPATH}:${HBASE_CONF_DIR}" CLASSPATH=${CLASSPATH}:$JAVA_HOME/lib/tools.jar # for developers, add hbase and hadoop classes to CLASSPATH @@ -112,13 +112,13 @@ for f in "$HBASE_HOME/hadoop-hbase-*.jar"; do CLASSPATH=${CLASSPATH}:$f; done -for f in "$HADOOP_HOME/build/contrib/hbase/hadoop-hbase-*.jar"; do - CLASSPATH=${CLASSPATH}:$f; -done +if [ -f "$HADOOP_HOME/contrib/hadoop-hbase.jar" ]; then + CLASSPATH=${CLASSPATH}:$HADOOP_HOME/contrib/hadoop-hbase.jar +fi if [ -d "$HADOOP_HOME/webapps" ]; then CLASSPATH=${CLASSPATH}:$HADOOP_HOME fi -for f in $HADOOP_HOME/hadoop-*-core.jar; do +for f in $HADOOP_HOME/hadoop-*.jar; do CLASSPATH=${CLASSPATH}:$f; done