Index: src/java/org/apache/hama/HamaAdminImpl.java =================================================================== --- src/java/org/apache/hama/HamaAdminImpl.java (revision 731919) +++ src/java/org/apache/hama/HamaAdminImpl.java (working copy) @@ -117,6 +117,7 @@ try { table.commit(update); + table.flushCommits(); result = true; } catch (IOException e) { e.printStackTrace(); Index: src/java/org/apache/hama/DenseMatrix.java =================================================================== --- src/java/org/apache/hama/DenseMatrix.java (revision 731919) +++ src/java/org/apache/hama/DenseMatrix.java (working copy) @@ -32,16 +32,13 @@ import org.apache.hadoop.hbase.io.Cell; import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.apache.hadoop.mapred.lib.NullOutputFormat; import org.apache.hama.algebra.BlockCyclicMultiplyMap; import org.apache.hama.algebra.BlockCyclicMultiplyReduce; import org.apache.hama.algebra.RowCyclicAdditionMap; @@ -56,6 +53,7 @@ import org.apache.hama.io.VectorWritable; import org.apache.hama.mapred.BlockingMapRed; import org.apache.hama.mapred.RandomMatrixMap; +import org.apache.hama.mapred.RandomMatrixReduce; import org.apache.hama.util.BytesUtil; import org.apache.hama.util.JobManager; import org.apache.hama.util.RandomVariable; @@ -263,13 +261,13 @@ final Path inDir = new Path(TMP_DIR, "in"); FileInputFormat.setInputPaths(jobConf, inDir); jobConf.setMapperClass(RandomMatrixMap.class); - - jobConf.setOutputKeyClass(BooleanWritable.class); - jobConf.setOutputValueClass(LongWritable.class); - jobConf.setOutputFormat(NullOutputFormat.class); + jobConf.setMapOutputKeyClass(IntWritable.class); + jobConf.setMapOutputValueClass(VectorWritable.class); + + RandomMatrixReduce.initJob(rand.getPath(), RandomMatrixReduce.class, + jobConf); jobConf.setSpeculativeExecution(false); jobConf.set("matrix.column", String.valueOf(n)); - jobConf.set("matrix.path", rand.getPath()); jobConf.setInputFormat(SequenceFileInputFormat.class); final FileSystem fs = FileSystem.get(jobConf); @@ -436,6 +434,7 @@ VectorUpdate update = new VectorUpdate(row); update.putAll(((DenseVector) vector).getEntries().entrySet()); table.commit(update.getBatchUpdate()); + table.flushCommits(); } public void setColumn(int column, Vector vector) throws IOException { @@ -443,6 +442,7 @@ VectorUpdate update = new VectorUpdate(i); update.put(column, vector.get(i)); table.commit(update.getBatchUpdate()); + table.flushCommits(); } } @@ -490,6 +490,7 @@ BatchUpdate update = new BatchUpdate(new BlockID(i, j).getBytes()); update.put(Bytes.toBytes(Constants.BLOCK), matrix.getBytes()); table.commit(update); + table.flushCommits(); } /** @@ -529,6 +530,7 @@ update.put(Constants.BLOCK_PATH, Bytes.toBytes(path)); update.put(Constants.BLOCK_SIZE, Bytes.toBytes(size)); table.commit(update); + table.flushCommits(); } public int getBlockedMatrixSize() throws IOException { Index: src/java/org/apache/hama/mapred/RandomMatrixMap.java =================================================================== --- src/java/org/apache/hama/mapred/RandomMatrixMap.java (revision 731919) +++ src/java/org/apache/hama/mapred/RandomMatrixMap.java (working copy) @@ -21,16 +21,14 @@ import java.io.IOException; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; -import org.apache.hama.io.VectorUpdate; +import org.apache.hama.DenseVector; +import org.apache.hama.io.VectorWritable; import org.apache.hama.util.RandomVariable; import org.apache.log4j.Logger; @@ -38,30 +36,24 @@ * Generate matrix with random elements */ public class RandomMatrixMap extends MapReduceBase implements - Mapper { + Mapper { static final Logger LOG = Logger.getLogger(RandomMatrixMap.class); - protected HTable table; protected int column; - + protected DenseVector vector = new DenseVector();; @Override public void map(IntWritable key, IntWritable value, - OutputCollector output, Reporter report) + OutputCollector output, Reporter report) throws IOException { + vector.clear(); for (int i = key.get(); i <= value.get(); i++) { - VectorUpdate batchUpdate = new VectorUpdate(i); for (int j = 0; j < column; j++) { - batchUpdate.put(j, RandomVariable.rand()); + vector.set(j, RandomVariable.rand()); } - table.commit(batchUpdate.getBatchUpdate()); + output.collect(key, new VectorWritable(i, vector)); } } public void configure(JobConf job) { - try { - column = Integer.parseInt(job.get("matrix.column")); - table = new HTable(job.get("matrix.path")); - } catch (IOException e) { - e.printStackTrace(); - } + column = Integer.parseInt(job.get("matrix.column")); } } Index: src/java/org/apache/hama/mapred/TableInputFormatBase.java =================================================================== --- src/java/org/apache/hama/mapred/TableInputFormatBase.java (revision 731919) +++ src/java/org/apache/hama/mapred/TableInputFormatBase.java (working copy) @@ -40,12 +40,11 @@ protected byte[][] inputColumns; protected HTable table; protected RowFilterInterface rowFilter; - protected static int repeat; + /** * space delimited list of columns */ public static final String COLUMN_LIST = "hama.mapred.tablecolumns"; - public static final String REPEAT_NUM = "hama.mapred.repeat"; public void configure(JobConf job) { Path[] tableNames = FileInputFormat.getInputPaths(job); Index: src/java/org/apache/hama/mapred/VectorOutputFormat.java =================================================================== --- src/java/org/apache/hama/mapred/VectorOutputFormat.java (revision 731919) +++ src/java/org/apache/hama/mapred/VectorOutputFormat.java (working copy) @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.hadoop.mapred.FileOutputFormat; @@ -60,13 +61,14 @@ m_table = table; } - /** {@inheritDoc} */ - public void close(Reporter reporter) { + public void close(@SuppressWarnings("unused") + Reporter reporter) throws IOException { + m_table.flushCommits(); } /** {@inheritDoc} */ public void write(IntWritable key, VectorUpdate value) throws IOException { - m_table.commit(value.getBatchUpdate()); + m_table.commit(new BatchUpdate(value.getBatchUpdate())); } } Index: src/java/org/apache/hama/mapred/RandomMatrixReduce.java =================================================================== --- src/java/org/apache/hama/mapred/RandomMatrixReduce.java (revision 0) +++ src/java/org/apache/hama/mapred/RandomMatrixReduce.java (revision 0) @@ -0,0 +1,68 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.mapred; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hama.io.VectorUpdate; +import org.apache.hama.io.VectorWritable; +import org.apache.log4j.Logger; + +public class RandomMatrixReduce extends MapReduceBase implements + Reducer { + static final Logger LOG = Logger.getLogger(RandomMatrixReduce.class); + + /** + * Use this before submitting a TableReduce job. It will appropriately set up + * the JobConf. + * + * @param table + * @param reducer + * @param job + */ + public static void initJob(String table, Class reducer, + JobConf job) { + job.setOutputFormat(VectorOutputFormat.class); + job.setReducerClass(reducer); + job.set(VectorOutputFormat.OUTPUT_TABLE, table); + job.setOutputKeyClass(IntWritable.class); + job.setOutputValueClass(BatchUpdate.class); + } + + @Override + public void reduce(IntWritable key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + VectorUpdate update = new VectorUpdate(key.get()); + while (values.hasNext()) { + update.putAll(values.next().entrySet()); + output.collect(key, update); + } + } + +} Index: src/java/org/apache/hama/AbstractMatrix.java =================================================================== --- src/java/org/apache/hama/AbstractMatrix.java (revision 731919) +++ src/java/org/apache/hama/AbstractMatrix.java (working copy) @@ -121,6 +121,7 @@ VectorUpdate update = new VectorUpdate(i); update.put(j, value); table.commit(update.getBatchUpdate()); + table.flushCommits(); } /** {@inheritDoc} */ @@ -128,6 +129,7 @@ VectorUpdate update = new VectorUpdate(i); update.put(j, value + this.get(i, j)); table.commit(update.getBatchUpdate()); + table.flushCommits(); } /** {@inheritDoc} */ @@ -137,6 +139,7 @@ update.put(Constants.METADATA_COLUMNS, columns); table.commit(update.getBatchUpdate()); + table.flushCommits(); } public String getRowLabel(int row) throws IOException { @@ -151,6 +154,7 @@ VectorUpdate update = new VectorUpdate(row); update.put(Constants.ATTRIBUTE + "string", name); table.commit(update.getBatchUpdate()); + table.flushCommits(); } public String getColumnLabel(int column) throws IOException { @@ -174,6 +178,7 @@ BatchUpdate update = new BatchUpdate(Constants.METADATA); update.put(Constants.METADATA_REFERENCE, Bytes.toBytes(reference)); table.commit(update); + table.flushCommits(); } protected int incrementAndGetRef() throws IOException { @@ -233,6 +238,7 @@ BatchUpdate update = new BatchUpdate(Constants.METADATA); update.put(Constants.ALIASENAME, Bytes.toBytes(aliasename)); table.commit(update); + table.flushCommits(); return hamaAdmin.save(this, aliasename); } }