Index: conf/hama-env.sh =================================================================== --- conf/hama-env.sh (revision 730068) +++ conf/hama-env.sh (working copy) @@ -18,23 +18,24 @@ # * See the License for the specific language governing permissions and # * limitations under the License. # */ - # Set environment variables here. # The java implementation to use. Required. -# export JAVA_HOME=/usr/lib/j2sdk1.5-sun +export JAVA_HOME=/usr/java/jdk1.6.0_06 # hadoop conf dir. to find the mapreduce cluster. -# export HADOOP_CONF_DIR= +export HADOOP_CONF_DIR=/root/hadoop-0.19.0/conf # hbase conf dir. to find the hbase cluster. -# export HBASE_CONF_DIR= +export HBASE_CONF_DIR=/root/hbase-trunk/conf # Extra Java CLASSPATH elements. Optional. -# export HAMA_CLASSPATH=$HADOOP_CONF_DIR:$HBASE_CONF_DIR +export HAMA_CLASSPATH=$HADOOP_CONF_DIR:$HBASE_CONF_DIR # The maximum amount of heap to use, in MB. Default is 1000. -# export HBASE_HEAPSIZE=1000 +export HBASE_HEAPSIZE=2000 # Where log files are stored. $HAMA_HOME/logs by default. -# export HAMA_LOG_DIR=${HAMA_HOME}/logs +export HAMA_LOG_DIR=${HAMA_HOME}/logs + +export HADOOP_HEAPSIZE=3000 Index: src/test/org/apache/hama/TestDenseMatrix.java =================================================================== --- src/test/org/apache/hama/TestDenseMatrix.java (revision 730068) +++ src/test/org/apache/hama/TestDenseMatrix.java (working copy) @@ -28,7 +28,6 @@ import junit.framework.TestSuite; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hama.io.BlockPosition; import org.apache.hama.io.DoubleEntry; import org.apache.log4j.Logger; @@ -83,25 +82,6 @@ assertEquals(m1.get(1, 1), origin + 0.5); } - public void testBlocking() throws IOException, ClassNotFoundException { - assertEquals(((DenseMatrix) m1).isBlocked(), false); - ((DenseMatrix) m1).blocking(4); - assertEquals(((DenseMatrix) m1).isBlocked(), true); - BlockPosition pos = ((DenseMatrix) m1).getBlockPosition(1, 0); - double[][] b = ((DenseMatrix) m1).subMatrix(pos.getStartRow(), - pos.getEndRow(), pos.getStartColumn(), pos.getEndColumn()) - .getDoubleArray(); - double[][] c = ((DenseMatrix) m1).getBlock(1, 0).getDoubleArray(); - assertEquals(((DenseMatrix) m1).getBlockSize(), 2); - assertEquals(c.length, 5); - - for (int i = 0; i < b.length; i++) { - for (int j = 0; j < b.length; j++) { - assertEquals(b[i][j], c[i][j]); - } - } - } - /** * Map/Reduce Blocking Test * @@ -112,19 +92,6 @@ assertEquals(((DenseMatrix) m2).isBlocked(), false); ((DenseMatrix) m2).blocking_mapred(4); assertEquals(((DenseMatrix) m2).isBlocked(), true); - BlockPosition pos = ((DenseMatrix) m2).getBlockPosition(1, 0); - double[][] b = ((DenseMatrix) m2).subMatrix(pos.getStartRow(), - pos.getEndRow(), pos.getStartColumn(), pos.getEndColumn()) - .getDoubleArray(); - double[][] c = ((DenseMatrix) m2).getBlock(1, 0).getDoubleArray(); - assertEquals(((DenseMatrix) m2).getBlockSize(), 2); - assertEquals(c.length, 5); - - for (int i = 0; i < b.length; i++) { - for (int j = 0; j < b.length; j++) { - assertEquals(b[i][j], c[i][j]); - } - } } /** Index: src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java =================================================================== --- src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java (revision 730068) +++ src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java (working copy) @@ -21,20 +21,13 @@ import java.io.IOException; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; import org.apache.hama.DenseMatrix; import org.apache.hama.HCluster; import org.apache.hama.Matrix; -import org.apache.hama.algebra.BlockCyclicMultiplyMap; -import org.apache.hama.algebra.BlockCyclicMultiplyReduce; -import org.apache.hama.io.BlockID; -import org.apache.hama.io.BlockWritable; import org.apache.log4j.Logger; public class TestBlockMatrixMapReduce extends HCluster { static final Logger LOG = Logger.getLogger(TestBlockMatrixMapReduce.class); - static Matrix c; static final int SIZE = 32; /** constructor */ @@ -50,41 +43,22 @@ ((DenseMatrix) m1).blocking_mapred(16); ((DenseMatrix) m2).blocking_mapred(16); - miniMRJob(m1.getPath(), m2.getPath()); + Matrix c = m1.mult(m2); - double[][] C = new double[SIZE][SIZE]; + double[][] mem = new double[SIZE][SIZE]; for (int i = 0; i < SIZE; i++) { for (int j = 0; j < SIZE; j++) { for (int k = 0; k < SIZE; k++) { - C[i][k] += m1.get(i, j) * m2.get(j, k); + mem[i][k] += m1.get(i, j) * m2.get(j, k); } } } for (int i = 0; i < SIZE; i++) { for (int j = 0; j < SIZE; j++) { - assertEquals(String.valueOf(C[i][j]).substring(0, 5), String.valueOf( + assertEquals(String.valueOf(mem[i][j]).substring(0, 5), String.valueOf( c.get(i, j)).substring(0, 5)); } } } - - private void miniMRJob(String string, String string2) throws IOException { - c = new DenseMatrix(conf); - String output = c.getPath(); - - JobConf jobConf = new JobConf(conf, TestBlockMatrixMapReduce.class); - jobConf.setJobName("test MR job"); - - BlockCyclicMultiplyMap.initJob(string, string2, - BlockCyclicMultiplyMap.class, BlockID.class, BlockWritable.class, - jobConf); - BlockCyclicMultiplyReduce.initJob(output, BlockCyclicMultiplyReduce.class, - jobConf); - - jobConf.setNumMapTasks(2); - jobConf.setNumReduceTasks(2); - - JobClient.runJob(jobConf); - } } Index: src/java/org/apache/hama/Constants.java =================================================================== --- src/java/org/apache/hama/Constants.java (revision 730068) +++ src/java/org/apache/hama/Constants.java (working copy) @@ -98,4 +98,6 @@ /** block size */ public static final String BLOCK_SIZE = "attribute:blockSize"; + + public static final String BLOCK_PATH = "attribute:blockPath"; } Index: src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java =================================================================== --- src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java (revision 730068) +++ src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java (working copy) @@ -21,6 +21,10 @@ import java.io.IOException; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scanner; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; @@ -69,13 +73,25 @@ public void map(BlockID key, BlockWritable value, OutputCollector output, Reporter reporter) throws IOException { - int blockSize = matrix_b.getBlockSize(); + int blockSize = matrix_b.getRows(); SubMatrix a = value.get(); - - for (int j = 0; j < blockSize; j++) { - SubMatrix b = matrix_b.getBlock(key.getColumn(), j); + HTable table = matrix_b.getHTable(); + + // startKey : new BlockID(key.getColumn(), 0).toString() + // endKey : new BlockID(key.getColumn(), blockSize+1).toString() + Scanner scan = table.getScanner(new byte[][] { Bytes + .toBytes(Constants.BLOCK) }, + new BlockID(key.getColumn(), 0).getBytes(), new BlockID( + key.getColumn(), blockSize + 1).getBytes()); + + for (RowResult row : scan) { + BlockID bid = new BlockID(row.getRow()); + LOG.info("xxxxxxx " + bid.getRow() + ", " + bid.getColumn()); + SubMatrix b = new SubMatrix(row.get(Constants.BLOCK).getValue()); SubMatrix c = a.mult(b); - output.collect(new BlockID(key.getRow(), j), new BlockWritable(c)); + output.collect(new BlockID(key.getRow(), bid.getColumn()), + new BlockWritable(c)); } + scan.close(); } } Index: src/java/org/apache/hama/SubMatrix.java =================================================================== --- src/java/org/apache/hama/SubMatrix.java (revision 730068) +++ src/java/org/apache/hama/SubMatrix.java (working copy) @@ -19,11 +19,12 @@ */ package org.apache.hama; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import org.apache.hama.util.BytesUtil; import org.apache.log4j.Logger; @@ -59,15 +60,19 @@ public SubMatrix(byte[] matrix) throws IOException { ByteArrayInputStream bos = new ByteArrayInputStream(matrix); - ObjectInputStream oos = new ObjectInputStream(bos); - Object obj = null; - try { - obj = oos.readObject(); - this.matrix = ((SubMatrix)obj).getDoubleArray(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); + DataInputStream dis = new DataInputStream(bos); + + int rows = dis.readInt(); + int columns = dis.readInt(); + this.matrix = new double[rows][columns]; + + for(int i = 0; i < rows; i++) { + for(int j = 0; j < columns; j++) { + this.matrix[i][j] = dis.readDouble(); + } } - oos.close(); + + dis.close(); bos.close(); } @@ -187,13 +192,22 @@ */ public byte[] getBytes() throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(bos); - oos.writeObject(this); - oos.flush(); - oos.close(); + DataOutputStream dos = new DataOutputStream(bos); + + dos.writeInt(this.getRows()); + dos.writeInt(this.getColumns()); + + for(int i = 0; i < this.getRows(); i++) { + for(int j = 0; j < this.getColumns(); j++) { + dos.writeDouble(this.get(i, j)); + } + } + + byte[] data = bos.toByteArray(); + dos.close(); bos.close(); - byte[] data = bos.toByteArray(); return data; } } + Index: src/java/org/apache/hama/DenseMatrix.java =================================================================== --- src/java/org/apache/hama/DenseMatrix.java (revision 730068) +++ src/java/org/apache/hama/DenseMatrix.java (working copy) @@ -395,7 +395,8 @@ jobConf.setNumReduceTasks(config.getNumReduceTasks()); if (this.isBlocked() && ((DenseMatrix) B).isBlocked()) { - BlockCyclicMultiplyMap.initJob(this.getPath(), B.getPath(), + BlockCyclicMultiplyMap.initJob(this.getBlockedMatrixPath(), + ((DenseMatrix) B).getBlockedMatrixPath(), BlockCyclicMultiplyMap.class, BlockID.class, BlockWritable.class, jobConf); BlockCyclicMultiplyReduce.initJob(result.getPath(), @@ -477,7 +478,7 @@ } public boolean isBlocked() throws IOException { - return (table.get(Constants.METADATA, Constants.BLOCK_SIZE) == null) ? false + return (table.get(Constants.METADATA, Constants.BLOCK_PATH) == null) ? false : true; } @@ -486,10 +487,78 @@ Bytes.toBytes(Constants.BLOCK)).getValue()); } + public void setBlock(int i, int j, SubMatrix matrix) throws IOException { + BatchUpdate update = new BatchUpdate(new BlockID(i, j).getBytes()); + update.put(Bytes.toBytes(Constants.BLOCK), matrix.getBytes()); + table.commit(update); + } + /** - * @return the size of block + * Using a map/reduce job to block a dense matrix. + * + * @param blockNum * @throws IOException */ + public void blocking_mapred(int blockNum) throws IOException { + double blocks = Math.pow(blockNum, 0.5); + if (!String.valueOf(blocks).endsWith(".0")) + throw new IOException("can't divide."); + + int block_size = (int) blocks; + Matrix blockedMatrix = new DenseMatrix(config); + blockedMatrix.setDimension(block_size, block_size); + this.setBlockedMatrixPath(blockedMatrix.getPath()); + + JobConf jobConf = new JobConf(config); + jobConf.setJobName("Blocking MR job" + getPath()); + + jobConf.setNumMapTasks(config.getNumMapTasks()); + jobConf.setNumReduceTasks(config.getNumReduceTasks()); + + BlockingMapRed.initJob(this.getPath(), blockedMatrix.getPath(), + block_size, this.getRows(), this.getColumns(), jobConf); + JobManager.execute(jobConf); + } + + public String getBlockedMatrixPath() throws IOException { + return Bytes.toString(table.get(Constants.METADATA, + Constants.BLOCK_PATH).getValue()); + } + + protected void setBlockedMatrixPath(String path) throws IOException { + BatchUpdate update = new BatchUpdate(Constants.METADATA); + update.put(Constants.BLOCK_PATH, Bytes.toBytes(path)); + table.commit(update); + } + + /* + public void blocking(int blockNum) throws IOException { + this.checkBlockNum(blockNum); + + String[] columns = new String[] { Constants.BLOCK_POSITION }; + Scanner scan = table.getScanner(columns); + + for (RowResult row : scan) { + BlockID bID = new BlockID(row.getRow()); + BlockPosition pos = new BlockPosition(row.get(Constants.BLOCK_POSITION) + .getValue()); + + setBlock(bID.getRow(), bID.getColumn(), subMatrix(pos.getStartRow(), pos + .getEndRow(), pos.getStartColumn(), pos.getEndColumn())); + } + } + private void checkBlockNum(int blockNum) throws IOException { + double blocks = Math.pow(blockNum, 0.5); + // TODO: Check also it is validation with matrix. + if (!String.valueOf(blocks).endsWith(".0")) + throw new IOException("can't divide."); + + int block_size = (int) blocks; + setBlockPosition(block_size); + setBlockSize(block_size); + LOG.info("Create " + block_size + " * " + block_size + " blocked matrix"); + } + public int getBlockSize() throws IOException { return (isBlocked()) ? BytesUtil.bytesToInt(table.get(Constants.METADATA, Constants.BLOCK_SIZE).getValue()) : -1; @@ -501,12 +570,6 @@ table.commit(update); } - public void setBlock(int i, int j, SubMatrix matrix) throws IOException { - BatchUpdate update = new BatchUpdate(new BlockID(i, j).getBytes()); - update.put(Bytes.toBytes(Constants.BLOCK), matrix.getBytes()); - table.commit(update); - } - protected void setBlockPosition(int blockNum) throws IOException { int block_row_size = this.getRows() / blockNum; int block_column_size = this.getColumns() / blockNum; @@ -543,59 +606,5 @@ Bytes.toBytes(Constants.BLOCK_POSITION)).getValue(); return new BlockPosition(rs); } - - /** - * Using a map/reduce job to block a dense matrix. - * - * @param blockNum - * @throws IOException - */ - public void blocking_mapred(int blockNum) throws IOException { - this.checkBlockNum(blockNum); - - JobConf jobConf = new JobConf(config); - jobConf.setJobName("Blocking MR job" + getPath()); - - jobConf.setNumMapTasks(config.getNumMapTasks()); - jobConf.setNumReduceTasks(config.getNumReduceTasks()); - - BlockingMapRed.initJob(getPath(), jobConf); - - JobManager.execute(jobConf); - } - - /** - * Using a scanner to block a dense matrix. If the matrix is large, use the - * blocking_mapred() - * - * @param blockNum - * @throws IOException - */ - public void blocking(int blockNum) throws IOException { - this.checkBlockNum(blockNum); - - String[] columns = new String[] { Constants.BLOCK_POSITION }; - Scanner scan = table.getScanner(columns); - - for (RowResult row : scan) { - BlockID bID = new BlockID(row.getRow()); - BlockPosition pos = new BlockPosition(row.get(Constants.BLOCK_POSITION) - .getValue()); - - setBlock(bID.getRow(), bID.getColumn(), subMatrix(pos.getStartRow(), pos - .getEndRow(), pos.getStartColumn(), pos.getEndColumn())); - } - } - - private void checkBlockNum(int blockNum) throws IOException { - double blocks = Math.pow(blockNum, 0.5); - // TODO: Check also it is validation with matrix. - if (!String.valueOf(blocks).endsWith(".0")) - throw new IOException("can't divide."); - - int block_size = (int) blocks; - setBlockPosition(block_size); - setBlockSize(block_size); - LOG.info("Create " + block_size + " * " + block_size + " blocked matrix"); - } + */ } Index: src/java/org/apache/hama/io/BlockWritable.java =================================================================== --- src/java/org/apache/hama/io/BlockWritable.java (revision 730068) +++ src/java/org/apache/hama/io/BlockWritable.java (working copy) @@ -19,16 +19,16 @@ */ package org.apache.hama.io; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; import org.apache.hama.SubMatrix; public class BlockWritable implements Writable { - public SubMatrix matrix; + private SubMatrix matrix; public BlockWritable() { this.matrix = new SubMatrix(0, 0); @@ -43,14 +43,35 @@ } public void readFields(DataInput in) throws IOException { - this.matrix = new SubMatrix(Bytes.readByteArray(in)); + + int rows = in.readInt(); + int columns = in.readInt(); + this.matrix = new SubMatrix(rows, columns); + + for(int i = 0; i < rows; i++) { + for(int j = 0; j < columns; j++) { + this.matrix.set(i, j, in.readDouble()); + } + } + + //this.matrix = new SubMatrix(Bytes.readByteArray(in)); } public void write(DataOutput out) throws IOException { - Bytes.writeByteArray(out, this.matrix.getBytes()); + //Bytes.writeByteArray(out, this.matrix.getBytes()); + + out.writeInt(this.matrix.getRows()); + out.writeInt(this.matrix.getColumns()); + + for(int i = 0; i < this.matrix.getRows(); i++) { + for(int j = 0; j < this.matrix.getColumns(); j++) { + out.writeDouble(this.matrix.get(i, j)); + } + } } public SubMatrix get() { return this.matrix; } } + Index: src/java/org/apache/hama/io/BlockID.java =================================================================== --- src/java/org/apache/hama/io/BlockID.java (revision 730068) +++ src/java/org/apache/hama/io/BlockID.java (working copy) @@ -19,20 +19,19 @@ */ package org.apache.hama.io; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; /** A WritableComparable for BlockIDs. */ @SuppressWarnings("unchecked") -public class BlockID implements WritableComparable, java.io.Serializable { - private static final long serialVersionUID = 6434651179475226613L; +public class BlockID implements WritableComparable { + static final Logger LOG = Logger.getLogger(BlockID.class); + public static final int PAD_SIZE = 15; private int row; private int column; @@ -44,18 +43,28 @@ } public BlockID(byte[] bytes) throws IOException { - ByteArrayInputStream bos = new ByteArrayInputStream(bytes); - ObjectInputStream oos = new ObjectInputStream(bos); - Object obj = null; + String rKey = Bytes.toString(bytes); + String keys[] = null; + if (rKey.substring(0, 8).equals("00000000")) { + int i = 8; + while (rKey.charAt(i) == '0') { + i++; + } + keys = rKey.substring(i, rKey.length()).split("[,]"); + } else { + int i = 0; + while (rKey.charAt(i) == '0') { + i++; + } + keys = rKey.substring(i, rKey.length()).split("[,]"); + } + try { - obj = oos.readObject(); - this.row = ((BlockID)obj).getRow(); - this.column = ((BlockID)obj).getColumn(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); + this.row = Integer.parseInt(keys[1]); + this.column = Integer.parseInt(keys[2]); + } catch (ArrayIndexOutOfBoundsException e) { + throw new ArrayIndexOutOfBoundsException(rKey + "\n" + e); } - oos.close(); - bos.close(); } public void set(int row, int column) { @@ -72,20 +81,27 @@ } public void readFields(DataInput in) throws IOException { - column = in.readInt(); - row = in.readInt(); + BlockID value = new BlockID(Bytes.readByteArray(in)); + this.row = value.getRow(); + this.column = value.getColumn(); } public void write(DataOutput out) throws IOException { - out.writeInt(column); - out.writeInt(row); + Bytes.writeByteArray(out, Bytes.toBytes(this.toString())); } /** * Make BlockID's string representation be same format. */ public String toString() { - return row + "," + column; + int zeros = PAD_SIZE - String.valueOf(row).length() + - String.valueOf(column).length(); + StringBuffer buf = new StringBuffer(); + for (int i = 0; i < zeros; ++i) { + buf.append("0"); + } + + return buf.toString() + "," + row + "," + column; } @Override @@ -110,19 +126,14 @@ @Override public boolean equals(Object o) { - if(o == null) return false; - if(!(o instanceof BlockID)) return false; + if (o == null) + return false; + if (!(o instanceof BlockID)) + return false; return compareTo(o) == 0; } - public byte[] getBytes() throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(bos); - oos.writeObject(this); - oos.flush(); - oos.close(); - bos.close(); - byte[] data = bos.toByteArray(); - return data; + public byte[] getBytes() { + return Bytes.toBytes(this.toString()); } } Index: src/java/org/apache/hama/mapred/BlockingMapRed.java =================================================================== --- src/java/org/apache/hama/mapred/BlockingMapRed.java (revision 730068) +++ src/java/org/apache/hama/mapred/BlockingMapRed.java (working copy) @@ -46,14 +46,22 @@ static final Log LOG = LogFactory.getLog(BlockingMapRed.class); /** Parameter of the path of the matrix to be blocked * */ public static final String BLOCKING_MATRIX = "hama.blocking.matrix"; + public static final String BLOCKED_MATRIX = "hama.blocked.matrix"; + public static final String BLOCK_SIZE = "hama.blocking.size"; + public static final String ROWS = "hama.blocking.rows"; + public static final String COLUMNS = "hama.blocking.columns"; /** * Initialize a job to blocking a table * * @param matrixPath + * @param string + * @param j + * @param i + * @param block_size * @param job */ - public static void initJob(String matrixPath, JobConf job) { + public static void initJob(String matrixPath, String string, int block_size, int i, int j, JobConf job) { job.setMapperClass(BlockingMapper.class); job.setReducerClass(BlockingReducer.class); FileInputFormat.addInputPaths(job, matrixPath); @@ -64,6 +72,11 @@ job.setOutputFormat(NullOutputFormat.class); job.set(BLOCKING_MATRIX, matrixPath); + job.set(BLOCKED_MATRIX, string); + job.set(BLOCK_SIZE, String.valueOf(block_size)); + job.set(ROWS, String.valueOf(i)); + job.set(COLUMNS, String.valueOf(j)); + job.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN); } @@ -73,6 +86,7 @@ public static abstract class BlockingMapRedBase extends MapReduceBase { protected DenseMatrix matrix; + protected DenseMatrix blockedMatrix; protected int mBlockNum; protected int mBlockRowSize; protected int mBlockColSize; @@ -85,12 +99,15 @@ try { matrix = new DenseMatrix(new HamaConfiguration(), job.get( BLOCKING_MATRIX, "")); - mBlockNum = matrix.getBlockSize(); - mBlockRowSize = matrix.getRows() / mBlockNum; - mBlockColSize = matrix.getColumns() / mBlockNum; + blockedMatrix = new DenseMatrix(new HamaConfiguration(), job.get( + BLOCKED_MATRIX, "")); - mRows = matrix.getRows(); - mColumns = matrix.getColumns(); + mBlockNum = Integer.parseInt(job.get(BLOCK_SIZE, "")); + mRows = Integer.parseInt(job.get(ROWS, "")); + mColumns = Integer.parseInt(job.get(COLUMNS, "")); + + mBlockRowSize = mRows / mBlockNum; + mBlockColSize = mColumns / mBlockNum; } catch (IOException e) { LOG.warn("Load matrix_blocking failed : " + e.getMessage()); } @@ -173,7 +190,7 @@ } } - matrix.setBlock(key.getRow(), key.getColumn(), subMatrix); + blockedMatrix.setBlock(key.getRow(), key.getColumn(), subMatrix); } } Index: src/java/org/apache/hama/AbstractMatrix.java =================================================================== --- src/java/org/apache/hama/AbstractMatrix.java (revision 730068) +++ src/java/org/apache/hama/AbstractMatrix.java (working copy) @@ -32,6 +32,8 @@ import org.apache.hama.io.VectorUpdate; import org.apache.hama.util.BytesUtil; import org.apache.log4j.Logger; +import org.apache.hadoop.hbase.HColumnDescriptor.CompressionType; +import org.apache.hadoop.hbase.HConstants; /** * Methods of the matrix classes @@ -73,7 +75,10 @@ this.tableDesc.addFamily(new HColumnDescriptor(Constants.COLUMN)); this.tableDesc.addFamily(new HColumnDescriptor(Constants.ATTRIBUTE)); this.tableDesc.addFamily(new HColumnDescriptor(Constants.ALIASEFAMILY)); - this.tableDesc.addFamily(new HColumnDescriptor(Constants.BLOCK)); + this.tableDesc.addFamily(new HColumnDescriptor( + Bytes.toBytes(Constants.BLOCK), 1, CompressionType.NONE, + false, false, Integer.MAX_VALUE, HConstants.FOREVER, false + )); LOG.info("Initializing the matrix storage."); this.admin.createTable(this.tableDesc);