Index: src/examples/org/apache/hama/examples/MatrixMultiplication.java =================================================================== --- src/examples/org/apache/hama/examples/MatrixMultiplication.java (revision 732188) +++ src/examples/org/apache/hama/examples/MatrixMultiplication.java (working copy) @@ -39,13 +39,14 @@ DenseMatrix a = new DenseMatrix(conf, matrixA, false); DenseMatrix b = new DenseMatrix(conf, matrixB, false); - + Matrix c; + if (ARGS.size() > 2) { - a.blocking_mapred(Integer.parseInt(ARGS.get(2))); - b.blocking_mapred(Integer.parseInt(ARGS.get(2))); + c = a.mult(b, Integer.parseInt(ARGS.get(2))); + } else { + c = a.mult(b); } - Matrix c = a.mult(b); for (int i = 0; i < 2; i++) { for (int j = 0; j < 2; j++) { System.out.println("c(" + i + ", " + j + ") : " + c.get(i, j)); Index: src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java =================================================================== --- src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java (revision 732267) +++ src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java (working copy) @@ -21,7 +21,6 @@ import java.io.IOException; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; @@ -38,18 +37,8 @@ public class BlockCyclicMultiplyMap extends MapReduceBase implements Mapper { static final Logger LOG = Logger.getLogger(BlockCyclicMultiplyMap.class); - protected HTable table; - public static final String MATRIX_B = "hama.multiplication.matrix.b"; - - public void configure(JobConf job) { - try { - table = new HTable(job.get(MATRIX_B, "")); - } catch (IOException e) { - LOG.warn("Load matrix_b failed : " + e.getMessage()); - } - } - public static void initJob(String matrix_a, String matrix_b, int block_size, + public static void initJob(String matrix_a, Class map, Class outputKeyClass, Class outputValueClass, JobConf jobConf) { @@ -56,7 +45,6 @@ jobConf.setMapOutputValueClass(outputValueClass); jobConf.setMapOutputKeyClass(outputKeyClass); jobConf.setMapperClass(map); - jobConf.set(MATRIX_B, matrix_b); jobConf.setInputFormat(BlockInputFormat.class); FileInputFormat.addInputPaths(jobConf, matrix_a); @@ -62,7 +50,6 @@ FileInputFormat.addInputPaths(jobConf, matrix_a); jobConf.set(BlockInputFormat.COLUMN_LIST, Constants.BLOCK); - jobConf.set(BlockInputFormat.REPEAT_NUM, String.valueOf(block_size)); } @Override @@ -69,13 +56,7 @@ public void map(BlockID key, BlockWritable value, OutputCollector output, Reporter reporter) throws IOException { - SubMatrix a = value.get(); - SubMatrix b = new SubMatrix(table.get( - new BlockID(key.getColumn(), BlockInputFormat.getRepeatCount()) - .toString(), Constants.BLOCK).getValue()); - SubMatrix c = a.mult(b); - output.collect( - new BlockID(key.getRow(), BlockInputFormat.getRepeatCount()), - new BlockWritable(c)); + SubMatrix c = value.getA().mult(value.getB()); + output.collect(key, new BlockWritable(c)); } } Index: src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java =================================================================== --- src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java (revision 732188) +++ src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java (working copy) @@ -64,7 +64,7 @@ SubMatrix s = null; while (values.hasNext()) { - SubMatrix b = values.next().get(); + SubMatrix b = values.next().getMatrices().next(); if (s == null) { s = b; } else { Index: src/java/org/apache/hama/DenseMatrix.java =================================================================== --- src/java/org/apache/hama/DenseMatrix.java (revision 732218) +++ src/java/org/apache/hama/DenseMatrix.java (working copy) @@ -28,10 +28,8 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Scanner; -import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.Cell; import org.apache.hadoop.hbase.io.RowResult; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; @@ -383,8 +381,31 @@ } public Matrix mult(Matrix B) throws IOException { - Matrix result = new DenseMatrix(config); + Matrix result = new DenseMatrix(config); + + JobConf jobConf = new JobConf(config); + jobConf.setJobName("multiplication MR job : " + result.getPath()); + jobConf.setNumMapTasks(config.getNumMapTasks()); + jobConf.setNumReduceTasks(config.getNumReduceTasks()); + + SIMDMultiplyMap.initJob(this.getPath(), B.getPath(), + SIMDMultiplyMap.class, IntWritable.class, VectorWritable.class, + jobConf); + SIMDMultiplyReduce.initJob(result.getPath(), SIMDMultiplyReduce.class, + jobConf); + JobManager.execute(jobConf, result); + return result; + } + + public Matrix mult(Matrix B, int blocks) throws IOException { + Matrix collectionTable = new DenseMatrix(config); + + blocking_mapred(this, collectionTable, blocks, true); + blocking_mapred(B, collectionTable, blocks, false); + + Matrix result = new DenseMatrix(config); + JobConf jobConf = new JobConf(config); jobConf.setJobName("multiplication MR job : " + result.getPath()); @@ -390,21 +411,12 @@ jobConf.setNumMapTasks(config.getNumMapTasks()); jobConf.setNumReduceTasks(config.getNumReduceTasks()); - - if (this.isBlocked() && ((DenseMatrix) B).isBlocked()) { - BlockCyclicMultiplyMap.initJob(this.getBlockedMatrixPath(), - ((DenseMatrix) B).getBlockedMatrixPath(), this.getBlockedMatrixSize(), - BlockCyclicMultiplyMap.class, BlockID.class, BlockWritable.class, - jobConf); - BlockCyclicMultiplyReduce.initJob(result.getPath(), - BlockCyclicMultiplyReduce.class, jobConf); - } else { - SIMDMultiplyMap.initJob(this.getPath(), B.getPath(), - SIMDMultiplyMap.class, IntWritable.class, VectorWritable.class, - jobConf); - SIMDMultiplyReduce.initJob(result.getPath(), SIMDMultiplyReduce.class, - jobConf); - } + + BlockCyclicMultiplyMap.initJob(collectionTable.getPath(), + BlockCyclicMultiplyMap.class, BlockID.class, BlockWritable.class, + jobConf); + BlockCyclicMultiplyReduce.initJob(result.getPath(), + BlockCyclicMultiplyReduce.class, jobConf); JobManager.execute(jobConf, result); return result; @@ -474,22 +486,6 @@ return result; } - public boolean isBlocked() throws IOException { - return (table.get(Constants.METADATA, Constants.BLOCK_PATH) == null) ? false - : true; - } - - public SubMatrix getBlock(int i, int j) throws IOException { - return new SubMatrix(table.get(new BlockID(i, j).getBytes(), - Bytes.toBytes(Constants.BLOCK)).getValue()); - } - - public void setBlock(int i, int j, SubMatrix matrix) throws IOException { - BatchUpdate update = new BatchUpdate(new BlockID(i, j).getBytes()); - update.put(Bytes.toBytes(Constants.BLOCK), matrix.getBytes()); - table.commit(update); - } - /** * Using a map/reduce job to block a dense matrix. * @@ -496,7 +492,8 @@ * @param blockNum * @throws IOException */ - public void blocking_mapred(int blockNum) throws IOException { + public void blocking_mapred(Matrix resource, Matrix collectionTable, + int blockNum, boolean bool) throws IOException { double blocks = Math.pow(blockNum, 0.5); if (!String.valueOf(blocks).endsWith(".0")) throw new IOException("can't divide."); @@ -502,9 +499,10 @@ throw new IOException("can't divide."); int block_size = (int) blocks; - Matrix blockedMatrix = new DenseMatrix(config); - blockedMatrix.setDimension(block_size, block_size); - this.setBlockedMatrixPath(blockedMatrix.getPath(), block_size); + collectionTable.setDimension(block_size, block_size); + + // Need? + // this.setBlockedMatrixPath(collectionTable.getPath(), block_size); JobConf jobConf = new JobConf(config); jobConf.setJobName("Blocking MR job" + getPath()); @@ -512,25 +510,8 @@ jobConf.setNumMapTasks(config.getNumMapTasks()); jobConf.setNumReduceTasks(config.getNumReduceTasks()); - BlockingMapRed.initJob(this.getPath(), blockedMatrix.getPath(), - block_size, this.getRows(), this.getColumns(), jobConf); + BlockingMapRed.initJob(resource.getPath(), collectionTable.getPath(), + bool, block_size, this.getRows(), this.getColumns(), jobConf); JobManager.execute(jobConf); } - - public String getBlockedMatrixPath() throws IOException { - return Bytes.toString(table.get(Constants.METADATA, - Constants.BLOCK_PATH).getValue()); - } - - protected void setBlockedMatrixPath(String path, int size) throws IOException { - BatchUpdate update = new BatchUpdate(Constants.METADATA); - update.put(Constants.BLOCK_PATH, Bytes.toBytes(path)); - update.put(Constants.BLOCK_SIZE, Bytes.toBytes(size)); - table.commit(update); - } - - public int getBlockedMatrixSize() throws IOException { - return Bytes.toInt(table.get(Constants.METADATA, - Constants.BLOCK_SIZE).getValue()); - } } Index: src/java/org/apache/hama/io/BlockID.java =================================================================== --- src/java/org/apache/hama/io/BlockID.java (revision 732188) +++ src/java/org/apache/hama/io/BlockID.java (working copy) @@ -34,7 +34,8 @@ public static final int PAD_SIZE = 15; private int row; private int column; - + private int seq = -1; + public BlockID() { } @@ -61,7 +62,8 @@ try { this.row = Integer.parseInt(keys[1]); - this.column = Integer.parseInt(keys[2]); + String[] columns = keys[2].split("[-]"); + this.column = Integer.parseInt(columns[0]); } catch (ArrayIndexOutOfBoundsException e) { throw new ArrayIndexOutOfBoundsException(rKey + "\n" + e); } @@ -67,6 +69,11 @@ } } + public BlockID(int row, int column, int seq) { + set(row, column); + this.seq = seq; + } + public void set(int row, int column) { this.row = row; this.column = column; @@ -101,7 +108,11 @@ buf.append("0"); } - return buf.toString() + "," + row + "," + column; + if(seq > -1) { + return buf.toString() + "," + row + "," + column + "-" + seq; + } else { + return buf.toString() + "," + row + "," + column; + } } @Override Index: src/java/org/apache/hama/io/BlockWritable.java =================================================================== --- src/java/org/apache/hama/io/BlockWritable.java (revision 732188) +++ src/java/org/apache/hama/io/BlockWritable.java (working copy) @@ -19,53 +19,68 @@ */ package org.apache.hama.io; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Writable; +import org.apache.hama.Constants; import org.apache.hama.SubMatrix; public class BlockWritable implements Writable { - private SubMatrix matrix; + static final Log LOG = LogFactory.getLog(BlockWritable.class); + private List matrices; public BlockWritable() { - this.matrix = new SubMatrix(0, 0); + this.matrices = new ArrayList(); } - public BlockWritable(SubMatrix c) { - this.matrix = c; - } - - public BlockWritable(byte[] bytes) throws IOException { - this.matrix = new SubMatrix(bytes); + public BlockWritable(SubMatrix subMatrix) { + this.matrices = new ArrayList(); + this.matrices.add(subMatrix); } public void readFields(DataInput in) throws IOException { - - int rows = in.readInt(); - int columns = in.readInt(); - this.matrix = new SubMatrix(rows, columns); - - for(int i = 0; i < rows; i++) { - for(int j = 0; j < columns; j++) { - this.matrix.set(i, j, in.readDouble()); + this.matrices.clear(); + int size = in.readInt(); + + for (int x = 0; x < size; x++) { + int rows = in.readInt(); + int columns = in.readInt(); + + SubMatrix matrix = new SubMatrix(rows, columns); + + for (int i = 0; i < rows; i++) { + for (int j = 0; j < columns; j++) { + matrix.set(i, j, in.readDouble()); + } } + + this.matrices.add(matrix); } - - //this.matrix = new SubMatrix(Bytes.readByteArray(in)); } public void write(DataOutput out) throws IOException { - //Bytes.writeByteArray(out, this.matrix.getBytes()); - - out.writeInt(this.matrix.getRows()); - out.writeInt(this.matrix.getColumns()); - - for(int i = 0; i < this.matrix.getRows(); i++) { - for(int j = 0; j < this.matrix.getColumns(); j++) { - out.writeDouble(this.matrix.get(i, j)); + Iterator it = this.matrices.iterator(); + + int size = this.matrices.size(); + out.writeInt(size); + + while (it.hasNext()) { + SubMatrix matrix = it.next(); + + out.writeInt(matrix.getRows()); + out.writeInt(matrix.getColumns()); + + for (int i = 0; i < matrix.getRows(); i++) { + for (int j = 0; j < matrix.getColumns(); j++) { + out.writeDouble(matrix.get(i, j)); + } } } } @@ -70,8 +85,24 @@ } } - public SubMatrix get() { - return this.matrix; + public void set(byte[] key, byte[] value) throws IOException { + int index = 0; + if (new String(key).equals(Constants.BLOCK + "b")) { + index = 1; + } + + this.matrices.add(index, new SubMatrix(value)); } -} + + public Iterator getMatrices() { + return this.matrices.iterator(); + } + public SubMatrix getA() { + return this.matrices.get(0); + } + + public SubMatrix getB() { + return this.matrices.get(1); + } +} Index: src/java/org/apache/hama/mapred/BlockingMapRed.java =================================================================== --- src/java/org/apache/hama/mapred/BlockingMapRed.java (revision 732188) +++ src/java/org/apache/hama/mapred/BlockingMapRed.java (working copy) @@ -29,13 +29,11 @@ import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.lib.NullOutputFormat; import org.apache.hama.Constants; -import org.apache.hama.DenseMatrix; import org.apache.hama.DenseVector; -import org.apache.hama.HamaConfiguration; import org.apache.hama.SubMatrix; import org.apache.hama.io.BlockID; +import org.apache.hama.io.BlockWritable; import org.apache.hama.io.VectorWritable; /** @@ -45,11 +43,10 @@ static final Log LOG = LogFactory.getLog(BlockingMapRed.class); /** Parameter of the path of the matrix to be blocked * */ - public static final String BLOCKING_MATRIX = "hama.blocking.matrix"; - public static final String BLOCKED_MATRIX = "hama.blocked.matrix"; public static final String BLOCK_SIZE = "hama.blocking.size"; public static final String ROWS = "hama.blocking.rows"; public static final String COLUMNS = "hama.blocking.columns"; + public static final String MATRIX_POS = "a.ore.b"; /** * Initialize a job to blocking a table @@ -55,13 +52,14 @@ * Initialize a job to blocking a table * * @param matrixPath - * @param string + * @param collectionTable + * @param block_size * @param j * @param i - * @param block_size * @param job */ - public static void initJob(String matrixPath, String string, int block_size, int i, int j, JobConf job) { + public static void initJob(String matrixPath, String collectionTable, boolean bool + ,int block_size, int i, int j, JobConf job) { job.setMapperClass(BlockingMapper.class); job.setReducerClass(BlockingReducer.class); FileInputFormat.addInputPaths(job, matrixPath); @@ -67,17 +65,23 @@ FileInputFormat.addInputPaths(job, matrixPath); job.setInputFormat(VectorInputFormat.class); + job.setMapOutputKeyClass(BlockID.class); job.setMapOutputValueClass(VectorWritable.class); - job.setOutputFormat(NullOutputFormat.class); - - job.set(BLOCKING_MATRIX, matrixPath); - job.set(BLOCKED_MATRIX, string); + + job.setOutputFormat(BlockOutputFormat.class); + job.setOutputKeyClass(BlockID.class); + job.setOutputValueClass(BlockWritable.class); job.set(BLOCK_SIZE, String.valueOf(block_size)); job.set(ROWS, String.valueOf(i)); job.set(COLUMNS, String.valueOf(j)); - + job.setBoolean(MATRIX_POS, bool); job.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN); + if(bool) + job.set(BlockOutputFormat.COLUMN, "a"); + else + job.set(BlockOutputFormat.COLUMN, "b"); + job.set(BlockOutputFormat.OUTPUT_TABLE, collectionTable); } /** @@ -84,9 +88,6 @@ * Abstract Blocking Map/Reduce Class to configure the job. */ public static abstract class BlockingMapRedBase extends MapReduceBase { - - protected DenseMatrix matrix; - protected DenseMatrix blockedMatrix; protected int mBlockNum; protected int mBlockRowSize; protected int mBlockColSize; @@ -93,15 +94,11 @@ protected int mRows; protected int mColumns; + + protected boolean matrixPos; @Override public void configure(JobConf job) { - try { - matrix = new DenseMatrix(new HamaConfiguration(), job.get( - BLOCKING_MATRIX, "")); - blockedMatrix = new DenseMatrix(new HamaConfiguration(), job.get( - BLOCKED_MATRIX, "")); - mBlockNum = Integer.parseInt(job.get(BLOCK_SIZE, "")); mRows = Integer.parseInt(job.get(ROWS, "")); mColumns = Integer.parseInt(job.get(COLUMNS, "")); @@ -108,11 +105,9 @@ mBlockRowSize = mRows / mBlockNum; mBlockColSize = mColumns / mBlockNum; - } catch (IOException e) { - LOG.warn("Load matrix_blocking failed : " + e.getMessage()); - } + + matrixPos = job.getBoolean(MATRIX_POS, true); } - } /** @@ -142,7 +137,6 @@ i++; } while(endColumn < (mColumns-1)); } - } /** @@ -149,14 +143,13 @@ * Reducer Class */ public static class BlockingReducer extends BlockingMapRedBase implements - Reducer { + Reducer { @Override public void reduce(BlockID key, Iterator values, - OutputCollector output, Reporter reporter) + OutputCollector output, Reporter reporter) throws IOException { - // Note: all the sub-vectors are grouped by {@link - // org.apache.hama.io.BlockID} + // Note: all the sub-vectors are grouped by {@link org.apache.hama.io.BlockID} // the block's base offset in the original matrix int colBase = key.getColumn() * mBlockColSize; @@ -189,9 +182,24 @@ subMatrix.set(i, j, vw.get(colBase + j)); } } - - blockedMatrix.setBlock(key.getRow(), key.getColumn(), subMatrix); + BlockWritable outValue = new BlockWritable(subMatrix); + + // It'll used for only matrix multiplication. + if(matrixPos) { + for (int x = 0; x < mBlockNum; x++) { + int r = (key.getRow() * mBlockNum) * mBlockNum; + int seq = (x * mBlockNum) + key.getColumn() + r; + output.collect(new BlockID(key.getRow(), x, seq), outValue); + } + } else { + for (int x = 0; x < mBlockNum; x++) { + int seq = (x * mBlockNum * mBlockNum) + + (key.getColumn() * mBlockNum) + key.getRow(); + output.collect(new BlockID(x, key.getColumn(), seq), outValue); + } + } + + //output.collect(key, new BlockWritable(subMatrix)); } } - -} +} \ No newline at end of file Index: src/java/org/apache/hama/mapred/BlockInputFormat.java =================================================================== --- src/java/org/apache/hama/mapred/BlockInputFormat.java (revision 732267) +++ src/java/org/apache/hama/mapred/BlockInputFormat.java (working copy) @@ -20,6 +20,7 @@ package org.apache.hama.mapred; import java.io.IOException; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -24,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.io.Cell; import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.mapred.TableSplit; import org.apache.hadoop.hbase.util.Writables; @@ -34,7 +36,6 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.StringUtils; -import org.apache.hama.Constants; import org.apache.hama.io.BlockID; import org.apache.hama.io.BlockWritable; @@ -43,14 +44,6 @@ static final Log LOG = LogFactory.getLog(BlockInputFormat.class); private TableRecordReader tableRecordReader; - public static int getRepeatCount() { - return TableRecordReader.repeatCount; - } - - public static int getRepeat() { - return repeat; - } - /** * Iterate over an HBase table data, return (BlockID, BlockWritable) pairs */ @@ -56,7 +49,6 @@ */ protected static class TableRecordReader extends TableRecordReaderBase implements RecordReader { - private static int repeatCount = 0; /** * @return IntWritable @@ -99,14 +91,6 @@ boolean hasMore = result != null && result.size() > 0; - // Scanner will be restarted. - if(!hasMore && repeatCount < repeat - 1) { - this.init(); - repeatCount++; - result = this.scanner.next(); - hasMore = result != null && result.size() > 0; - } - if (hasMore) { byte[] row = result.getRow(); BlockID bID = new BlockID(row); @@ -112,8 +96,12 @@ BlockID bID = new BlockID(row); lastRow = row; key.set(bID.getRow(), bID.getColumn()); - byte[] rs = result.get(Constants.BLOCK).getValue(); - Writables.copyWritable(new BlockWritable(rs), value); + + BlockWritable block = new BlockWritable(); + for(Map.Entry e : result.entrySet()) { + block.set(e.getKey(), e.getValue().getValue()); + } + Writables.copyWritable(block, value); } return hasMore; } Index: src/java/org/apache/hama/mapred/BlockOutputFormat.java =================================================================== --- src/java/org/apache/hama/mapred/BlockOutputFormat.java (revision 0) +++ src/java/org/apache/hama/mapred/BlockOutputFormat.java (revision 0) @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hama.mapred; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.FileAlreadyExistsException; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.InvalidJobConfException; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; +import org.apache.hama.Constants; +import org.apache.hama.SubMatrix; +import org.apache.hama.io.BlockID; +import org.apache.hama.io.BlockWritable; + +public class BlockOutputFormat extends + FileOutputFormat { + + /** JobConf parameter that specifies the output table */ + public static final String OUTPUT_TABLE = "hama.mapred.output"; + public static final String COLUMN = "hama.mapred.output.column"; + private final static Log LOG = LogFactory.getLog(VectorOutputFormat.class); + + /** + * Convert Reduce output (key, value) to (IntWritable, VectorUpdate) + * and write to an HBase table + */ + protected static class TableRecordWriter implements + RecordWriter { + private HTable m_table; + private BatchUpdate update; + private String column; + + /** + * Instantiate a TableRecordWriter with the HBase HClient for writing. + * + * @param table + * @param column + */ + public TableRecordWriter(HTable table, String col) { + m_table = table; + column = col; + } + + public void close(@SuppressWarnings("unused") + Reporter reporter) throws IOException { + m_table.flushCommits(); + } + + /** {@inheritDoc} */ + public void write(BlockID key, BlockWritable value) throws IOException { + Iterator it = value.getMatrices(); + update = new BatchUpdate(key.getBytes()); + update.put(Bytes.toBytes(Constants.BLOCK + column), it.next().getBytes()); + m_table.commit(new BatchUpdate(update)); + } + } + + /** {@inheritDoc} */ + @Override + @SuppressWarnings("unchecked") + public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, + String name, Progressable progress) throws IOException { + + // expecting exactly one path + String column = job.get(COLUMN); + String tableName = job.get(OUTPUT_TABLE); + HTable table = null; + try { + table = new HTable(new HBaseConfiguration(job), tableName); + } catch (IOException e) { + LOG.error(e); + throw e; + } + return new TableRecordWriter(table, column); + } + + /** {@inheritDoc} */ + @Override + public void checkOutputSpecs(FileSystem ignored, JobConf job) + throws FileAlreadyExistsException, InvalidJobConfException, IOException { + + String tableName = job.get(OUTPUT_TABLE); + if (tableName == null) { + throw new IOException("Must specify table name"); + } + } +} Index: src/java/org/apache/hama/mapred/TableInputFormatBase.java =================================================================== --- src/java/org/apache/hama/mapred/TableInputFormatBase.java (revision 732267) +++ src/java/org/apache/hama/mapred/TableInputFormatBase.java (working copy) @@ -40,7 +40,6 @@ protected byte[][] inputColumns; protected HTable table; protected RowFilterInterface rowFilter; - protected static int repeat; /** * space delimited list of columns @@ -46,13 +45,9 @@ * space delimited list of columns */ public static final String COLUMN_LIST = "hama.mapred.tablecolumns"; - public static final String REPEAT_NUM = "hama.mapred.repeat"; public void configure(JobConf job) { Path[] tableNames = FileInputFormat.getInputPaths(job); - if(job.get(REPEAT_NUM) != null) { - setRepeat(Integer.parseInt(job.get(REPEAT_NUM))); - } String colArg = job.get(COLUMN_LIST); String[] colNames = colArg.split(" "); byte[][] m_cols = new byte[colNames.length][]; @@ -67,10 +62,6 @@ } } - private void setRepeat(int parseInt) { - repeat = parseInt; - } - public void validateInput(JobConf job) throws IOException { // expecting exactly one path Path[] tableNames = FileInputFormat.getInputPaths(job); Index: src/java/org/apache/hama/SubMatrix.java =================================================================== --- src/java/org/apache/hama/SubMatrix.java (revision 732188) +++ src/java/org/apache/hama/SubMatrix.java (working copy) @@ -209,5 +209,15 @@ return data; } + public String toString() { + String result = ""; + for (int i = 0; i < this.getRows(); i++) { + for (int j = 0; j < this.getColumns(); j++) { + result += this.get(i, j) + "\t"; + } + result += "\n"; + } + return result; + } } Index: src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java =================================================================== --- src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java (revision 732267) +++ src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java (working copy) @@ -23,7 +23,6 @@ import org.apache.hama.DenseMatrix; import org.apache.hama.HCluster; -import org.apache.hama.Matrix; import org.apache.log4j.Logger; public class TestBlockMatrixMapReduce extends HCluster { @@ -37,13 +36,10 @@ public void testBlockMatrixMapReduce() throws IOException, ClassNotFoundException { - Matrix m1 = DenseMatrix.random(conf, SIZE, SIZE); - Matrix m2 = DenseMatrix.random(conf, SIZE, SIZE); - // Partitioning 8 * 8 submatrix. It also the test submatrix() and scanner. - ((DenseMatrix) m1).blocking_mapred(16); - ((DenseMatrix) m2).blocking_mapred(16); + DenseMatrix m1 = DenseMatrix.random(conf, SIZE, SIZE); + DenseMatrix m2 = DenseMatrix.random(conf, SIZE, SIZE); - Matrix c = m1.mult(m2); + DenseMatrix c = (DenseMatrix) m1.mult(m2, 16); double[][] mem = new double[SIZE][SIZE]; for (int i = 0; i < SIZE; i++) { Index: src/test/org/apache/hama/TestDenseMatrix.java =================================================================== --- src/test/org/apache/hama/TestDenseMatrix.java (revision 732267) +++ src/test/org/apache/hama/TestDenseMatrix.java (working copy) @@ -83,18 +83,6 @@ } /** - * Map/Reduce Blocking Test - * - * @throws IOException - * @throws ClassNotFoundException - */ - public void testMRBlocking() throws IOException, ClassNotFoundException { - assertEquals(((DenseMatrix) m2).isBlocked(), false); - ((DenseMatrix) m2).blocking_mapred(4); - assertEquals(((DenseMatrix) m2).isBlocked(), true); - } - - /** * Column vector test. * * @param rand