Index: src/java/org/apache/hama/mapred/DenseInputFormat.java =================================================================== --- src/java/org/apache/hama/mapred/DenseInputFormat.java (revision 0) +++ src/java/org/apache/hama/mapred/DenseInputFormat.java (revision 0) @@ -0,0 +1,265 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.mapred; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scanner; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.filter.RowFilterSet; +import org.apache.hadoop.hbase.filter.StopRowFilter; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.mapred.TableSplit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hama.DenseVector; +import org.apache.hama.util.Numeric; + +public class DenseInputFormat extends MatrixInputFormatBase implements + JobConfigurable, InputFormat { + private final Log LOG = LogFactory.getLog(DenseInputFormat.class); + private byte[][] inputColumns; + private HTable table; + private TableRecordReader tableRecordReader; + + /** + * space delimited list of columns + * + * @see org.apache.hadoop.hbase.regionserver.HAbstractScanner for column name + * wildcards + */ + public static final String COLUMN_LIST = "hama.mapred.tablecolumns"; + + /** {@inheritDoc} */ + public void configure(JobConf job) { + Path[] tableNames = FileInputFormat.getInputPaths(job); + String colArg = job.get(COLUMN_LIST); + String[] colNames = colArg.split(" "); + byte[][] m_cols = new byte[colNames.length][]; + for (int i = 0; i < m_cols.length; i++) { + m_cols[i] = Bytes.toBytes(colNames[i]); + } + setInputColums(m_cols); + try { + setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName())); + } catch (Exception e) { + LOG.error(e); + } + } + + /** {@inheritDoc} */ + public void validateInput(JobConf job) throws IOException { + // expecting exactly one path + Path[] tableNames = FileInputFormat.getInputPaths(job); + if (tableNames == null || tableNames.length > 1) { + throw new IOException("expecting one table name"); + } + + // expecting at least one column + String colArg = job.get(COLUMN_LIST); + if (colArg == null || colArg.length() == 0) { + throw new IOException("expecting at least one column"); + } + } + + /** + * Iterate over an HBase table data, return (Text, DenseVector) pairs + */ + private static class TableRecordReader implements + RecordReader { + private byte[] startRow; + private byte[] endRow; + private RowFilterInterface trrRowFilter; + private Scanner scanner; + private HTable htable; + private byte[][] trrInputColumns; + + /** + * Build the scanner. Not done in constructor to allow for extension. + * + * @throws IOException + */ + public void init() throws IOException { + if ((endRow != null) && (endRow.length > 0)) { + if (trrRowFilter != null) { + final Set rowFiltersSet = new HashSet(); + rowFiltersSet.add(new StopRowFilter(endRow)); + rowFiltersSet.add(trrRowFilter); + this.scanner = this.htable.getScanner(trrInputColumns, startRow, + new RowFilterSet(RowFilterSet.Operator.MUST_PASS_ALL, + rowFiltersSet)); + } else { + this.scanner = this.htable.getScanner(trrInputColumns, startRow, + endRow); + } + } else { + this.scanner = this.htable.getScanner(trrInputColumns, startRow, + trrRowFilter); + } + } + + /** + * @param htable the {@link HTable} to scan. + */ + public void setHTable(HTable htable) { + this.htable = htable; + } + + /** + * @param inputColumns the columns to be placed in {@link DenseVector}. + */ + public void setInputColumns(final byte[][] inputColumns) { + byte[][] columns = inputColumns; + this.trrInputColumns = columns; + } + + /** + * @param startRow the first row in the split + */ + public void setStartRow(final byte[] startRow) { + byte[] sRow = startRow; + this.startRow = sRow; + } + + /** + * + * @param endRow the last row in the split + */ + public void setEndRow(final byte[] endRow) { + byte[] eRow = endRow; + this.endRow = eRow; + } + + /** + * @param rowFilter the {@link RowFilterInterface} to be used. + */ + public void setRowFilter(RowFilterInterface rowFilter) { + this.trrRowFilter = rowFilter; + } + + /** {@inheritDoc} */ + public void close() throws IOException { + this.scanner.close(); + } + + /** + * @return ImmutableBytesWritable + * + * @see org.apache.hadoop.mapred.RecordReader#createKey() + */ + public IntWritable createKey() { + return new IntWritable(); + } + + /** + * @return DenseVector + * + * @see org.apache.hadoop.mapred.RecordReader#createValue() + */ + public DenseVector createValue() { + return new DenseVector(); + } + + /** {@inheritDoc} */ + public long getPos() { + // This should be the ordinal tuple in the range; + // not clear how to calculate... + return 0; + } + + /** {@inheritDoc} */ + public float getProgress() { + // Depends on the total number of tuples and getPos + return 0; + } + + /** + * @param key HStoreKey as input key. + * @param value MapWritable as input value + * + * Converts Scanner.next() to Text, DenseVector + * + * @return true if there was more data + * @throws IOException + */ + @SuppressWarnings("unchecked") + public boolean next(IntWritable key, DenseVector value) + throws IOException { + RowResult result = this.scanner.next(); + boolean hasMore = result != null && result.size() > 0; + if (hasMore) { + key.set(Numeric.bytesToInt(result.getRow())); + Writables.copyWritable(result, value); + } + return hasMore; + } + } + + /** + * Builds a TableRecordReader. If no TableRecordReader was provided, uses the + * default. + * + * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit, + * JobConf, Reporter) + */ + public RecordReader getRecordReader( + InputSplit split, @SuppressWarnings("unused") JobConf job, + @SuppressWarnings("unused") Reporter reporter) throws IOException { + TableSplit tSplit = (TableSplit) split; + TableRecordReader trr = this.tableRecordReader; + // if no table record reader was provided use default + if (trr == null) { + trr = new TableRecordReader(); + } + trr.setStartRow(tSplit.getStartRow()); + trr.setEndRow(tSplit.getEndRow()); + trr.setHTable(this.table); + trr.setInputColumns(this.inputColumns); + trr.setRowFilter(this.rowFilter); + trr.init(); + return trr; + } + + /** + * Allows subclasses to set the {@link TableRecordReader}. + * + * @param tableRecordReader to provide other {@link TableRecordReader} + * implementations. + */ + private void setTableRecordReader(TableRecordReader tableRecordReader) { + this.tableRecordReader = tableRecordReader; + } + +} Index: src/java/org/apache/hama/mapred/DenseMap.java =================================================================== --- src/java/org/apache/hama/mapred/DenseMap.java (revision 689018) +++ src/java/org/apache/hama/mapred/DenseMap.java (working copy) @@ -47,14 +47,14 @@ Class outputKeyClass, Class outputValueClass, JobConf job) { - job.setInputFormat(MatrixInputFormat.class); + job.setInputFormat(DenseInputFormat.class); job.setMapOutputValueClass(outputValueClass); job.setMapOutputKeyClass(outputKeyClass); job.setMapperClass(mapper); FileInputFormat.addInputPaths(job, matrixA); MATRIX_B = new DenseMatrix(new HamaConfiguration(), matrixB); - job.set(MatrixInputFormat.COLUMN_LIST, Constants.COLUMN); + job.set(DenseInputFormat.COLUMN_LIST, Constants.COLUMN); } public abstract void map(IntWritable key, DenseVector value, Index: src/java/org/apache/hama/mapred/MatrixInputFormat.java =================================================================== --- src/java/org/apache/hama/mapred/MatrixInputFormat.java (revision 689018) +++ src/java/org/apache/hama/mapred/MatrixInputFormat.java (working copy) @@ -27,10 +27,16 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hama.DenseVector; +@Deprecated public class MatrixInputFormat extends MatrixInputFormatBase implements JobConfigurable { private final Log LOG = LogFactory.getLog(MatrixInputFormat.class); @@ -74,4 +80,11 @@ throw new IOException("expecting at least one column"); } } + + @Override + public RecordReader getRecordReader( + InputSplit arg0, JobConf arg1, Reporter arg2) throws IOException { + // TODO Auto-generated method stub + return null; + } } Index: src/java/org/apache/hama/mapred/MatrixInputFormatBase.java =================================================================== --- src/java/org/apache/hama/mapred/MatrixInputFormatBase.java (revision 689018) +++ src/java/org/apache/hama/mapred/MatrixInputFormatBase.java (working copy) @@ -20,197 +20,28 @@ package org.apache.hama.mapred; import java.io.IOException; -import java.util.HashSet; -import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Scanner; import org.apache.hadoop.hbase.filter.RowFilterInterface; -import org.apache.hadoop.hbase.filter.RowFilterSet; -import org.apache.hadoop.hbase.filter.StopRowFilter; -import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.mapred.TableSplit; import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; import org.apache.hama.DenseVector; -import org.apache.hama.util.Numeric; public abstract class MatrixInputFormatBase implements InputFormat { private final Log LOG = LogFactory.getLog(MatrixInputFormatBase.class); private byte[][] inputColumns; private HTable table; - private TableRecordReader tableRecordReader; - private RowFilterInterface rowFilter; - + protected RowFilterInterface rowFilter; + /** - * Iterate over an HBase table data, return (Text, DenseVector) pairs - */ - protected static class TableRecordReader implements - RecordReader { - private byte[] startRow; - private byte[] endRow; - private RowFilterInterface trrRowFilter; - private Scanner scanner; - private HTable htable; - private byte[][] trrInputColumns; - - /** - * Build the scanner. Not done in constructor to allow for extension. - * - * @throws IOException - */ - public void init() throws IOException { - if ((endRow != null) && (endRow.length > 0)) { - if (trrRowFilter != null) { - final Set rowFiltersSet = new HashSet(); - rowFiltersSet.add(new StopRowFilter(endRow)); - rowFiltersSet.add(trrRowFilter); - this.scanner = this.htable.getScanner(trrInputColumns, startRow, - new RowFilterSet(RowFilterSet.Operator.MUST_PASS_ALL, - rowFiltersSet)); - } else { - this.scanner = this.htable.getScanner(trrInputColumns, startRow, - endRow); - } - } else { - this.scanner = this.htable.getScanner(trrInputColumns, startRow, - trrRowFilter); - } - } - - /** - * @param htable the {@link HTable} to scan. - */ - public void setHTable(HTable htable) { - this.htable = htable; - } - - /** - * @param inputColumns the columns to be placed in {@link DenseVector}. - */ - public void setInputColumns(final byte[][] inputColumns) { - byte[][] columns = inputColumns; - this.trrInputColumns = columns; - } - - /** - * @param startRow the first row in the split - */ - public void setStartRow(final byte[] startRow) { - byte[] sRow = startRow; - this.startRow = sRow; - } - - /** - * - * @param endRow the last row in the split - */ - public void setEndRow(final byte[] endRow) { - byte[] eRow = endRow; - this.endRow = eRow; - } - - /** - * @param rowFilter the {@link RowFilterInterface} to be used. - */ - public void setRowFilter(RowFilterInterface rowFilter) { - this.trrRowFilter = rowFilter; - } - - /** {@inheritDoc} */ - public void close() throws IOException { - this.scanner.close(); - } - - /** - * @return ImmutableBytesWritable - * - * @see org.apache.hadoop.mapred.RecordReader#createKey() - */ - public IntWritable createKey() { - return new IntWritable(); - } - - /** - * @return DenseVector - * - * @see org.apache.hadoop.mapred.RecordReader#createValue() - */ - public DenseVector createValue() { - return new DenseVector(); - } - - /** {@inheritDoc} */ - public long getPos() { - // This should be the ordinal tuple in the range; - // not clear how to calculate... - return 0; - } - - /** {@inheritDoc} */ - public float getProgress() { - // Depends on the total number of tuples and getPos - return 0; - } - - /** - * @param key HStoreKey as input key. - * @param value MapWritable as input value - * - * Converts Scanner.next() to Text, DenseVector - * - * @return true if there was more data - * @throws IOException - */ - @SuppressWarnings("unchecked") - public boolean next(IntWritable key, DenseVector value) - throws IOException { - RowResult result = this.scanner.next(); - boolean hasMore = result != null && result.size() > 0; - if (hasMore) { - key.set(Numeric.bytesToInt(result.getRow())); - Writables.copyWritable(result, value); - } - return hasMore; - } - } - - /** - * Builds a TableRecordReader. If no TableRecordReader was provided, uses the - * default. - * - * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit, - * JobConf, Reporter) - */ - public RecordReader getRecordReader( - InputSplit split, @SuppressWarnings("unused") JobConf job, - @SuppressWarnings("unused") Reporter reporter) throws IOException { - TableSplit tSplit = (TableSplit) split; - TableRecordReader trr = this.tableRecordReader; - // if no table record reader was provided use default - if (trr == null) { - trr = new TableRecordReader(); - } - trr.setStartRow(tSplit.getStartRow()); - trr.setEndRow(tSplit.getEndRow()); - trr.setHTable(this.table); - trr.setInputColumns(this.inputColumns); - trr.setRowFilter(this.rowFilter); - trr.init(); - return trr; - } - - /** * Calculates the splits that will serve as input for the map tasks. *
    * Splits are created in number equal to the smallest between numSplits and @@ -280,16 +111,6 @@ } /** - * Allows subclasses to set the {@link TableRecordReader}. - * - * @param tableRecordReader to provide other {@link TableRecordReader} - * implementations. - */ - protected void setTableRecordReader(TableRecordReader tableRecordReader) { - this.tableRecordReader = tableRecordReader; - } - - /** * Allows subclasses to set the {@link RowFilterInterface} to be used. * * @param rowFilter