Index: build.xml
===================================================================
--- build.xml (리비전 942646)
+++ build.xml (작업 사본)
@@ -106,7 +106,7 @@
@@ -140,7 +140,7 @@
-
@@ -269,7 +269,7 @@
-
+
Index: src/examples/org/apache/hama/examples/AbstractExample.java
===================================================================
--- src/examples/org/apache/hama/examples/AbstractExample.java (리비전 942646)
+++ src/examples/org/apache/hama/examples/AbstractExample.java (작업 사본)
@@ -25,8 +25,8 @@
import org.apache.hama.HamaConfiguration;
public abstract class AbstractExample {
- public static final HamaConfiguration conf = new HamaConfiguration();
- public static List ARGS;
+ protected static final HamaConfiguration conf = new HamaConfiguration();
+ protected static List ARGS;
public static void parseArgs(String[] args) {
List other_args = new ArrayList();
Index: src/examples/org/apache/hama/examples/MatrixMultiplication.java
===================================================================
--- src/examples/org/apache/hama/examples/MatrixMultiplication.java (리비전 942646)
+++ src/examples/org/apache/hama/examples/MatrixMultiplication.java (작업 사본)
@@ -20,12 +20,29 @@
package org.apache.hama.examples;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hama.Constants;
import org.apache.hama.HamaAdmin;
import org.apache.hama.HamaAdminImpl;
+import org.apache.hama.examples.mapreduce.*;
+import org.apache.hama.io.BlockID;
import org.apache.hama.matrix.DenseMatrix;
import org.apache.hama.matrix.Matrix;
import org.apache.hama.matrix.SparseMatrix;
+import org.apache.hama.util.RandomVariable;
public class MatrixMultiplication extends AbstractExample {
public static void main(String[] args) throws IOException {
@@ -58,9 +75,9 @@
c = ((SparseMatrix) a).mult(b);
} else {
if (ARGS.size() > 2) {
- c = ((DenseMatrix) a).mult(b, Integer.parseInt(ARGS.get(2)));
+ c = mult(a, b, Integer.parseInt(ARGS.get(2)));
} else {
- c = ((DenseMatrix) a).mult(b);
+ c = mult(a, b);
}
}
@@ -69,4 +86,166 @@
}
System.out.println("...");
}
+
+ /**
+ * C = A*B using iterative method
+ *
+ * @param B
+ * @return C
+ * @throws IOException
+ */
+ public static DenseMatrix mult(Matrix A, Matrix B) throws IOException {
+ ensureForMultiplication(A, B);
+ int columns = 0;
+ if (B.getColumns() == 1 || A.getColumns() == 1)
+ columns = 1;
+ else
+ columns = A.getColumns();
+
+ DenseMatrix result = new DenseMatrix(conf, A.getRows(), columns);
+ List jobId = new ArrayList();
+
+ for (int i = 0; i < A.getRows(); i++) {
+ Job job = new Job(conf, "multiplication MR job : " + result.getPath()
+ + " " + i);
+
+ Scan scan = new Scan();
+ scan.addFamily(Constants.COLUMNFAMILY);
+ job.getConfiguration()
+ .set(DenseMatrixVectorMultMap.MATRIX_A, A.getPath());
+ job.getConfiguration().setInt(DenseMatrixVectorMultMap.ITH_ROW, i);
+
+ TableMapReduceUtil.initTableMapperJob(B.getPath(), scan,
+ DenseMatrixVectorMultMap.class, IntWritable.class, MapWritable.class,
+ job);
+ TableMapReduceUtil.initTableReducerJob(result.getPath(),
+ DenseMatrixVectorMultReduce.class, job);
+ try {
+ job.waitForCompletion(false);
+ jobId.add(job);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+
+ while (checkAllJobs(jobId) == false) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ return result;
+ }
+
+ /**
+ * C = A * B using Blocking algorithm
+ *
+ * @param B
+ * @param blocks the number of blocks
+ * @return C
+ * @throws IOException
+ */
+ public static DenseMatrix mult(Matrix A, Matrix B, int blocks)
+ throws IOException {
+ ensureForMultiplication(A, B);
+
+ String collectionTable = "collect_" + RandomVariable.randMatrixPath();
+ HTableDescriptor desc = new HTableDescriptor(collectionTable);
+ desc.addFamily(new HColumnDescriptor(Bytes.toBytes(Constants.BLOCK)));
+ new HBaseAdmin(conf).createTable(desc);
+
+ collectBlocksMapRed(A, collectionTable, blocks, true);
+ collectBlocksMapRed(B, collectionTable, blocks, false);
+
+ DenseMatrix result = new DenseMatrix(conf, A.getRows(), A.getColumns());
+
+ Job job = new Job(conf, "multiplication MR job : " + result.getPath());
+
+ Scan scan = new Scan();
+ scan.addFamily(Bytes.toBytes(Constants.BLOCK));
+
+ TableMapReduceUtil.initTableMapperJob(collectionTable, scan,
+ BlockMultMap.class, BlockID.class, BytesWritable.class, job);
+ TableMapReduceUtil.initTableReducerJob(result.getPath(),
+ BlockMultReduce.class, job);
+
+ try {
+ job.waitForCompletion(true);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+
+ new HamaAdminImpl(conf, new HBaseAdmin(conf)).delete(collectionTable);
+ return result;
+ }
+
+ /**
+ * Collect Blocks
+ *
+ * @param path a input path
+ * @param collectionTable the collection table
+ * @param blockNum the number of blocks
+ * @param bool
+ * @throws IOException
+ */
+ public static void collectBlocksMapRed(Matrix m, String collectionTable,
+ int blockNum, boolean bool) throws IOException {
+ double blocks = Math.pow(blockNum, 0.5);
+ if (!String.valueOf(blocks).endsWith(".0"))
+ throw new IOException("can't divide.");
+
+ int block_size = (int) blocks;
+ Job job = new Job(conf, "Blocking MR job" + m.getPath());
+
+ Scan scan = new Scan();
+ scan.addFamily(Constants.COLUMNFAMILY);
+
+ job.getConfiguration().set(CollectBlocksMapper.BLOCK_SIZE,
+ String.valueOf(block_size));
+ job.getConfiguration().set(CollectBlocksMapper.ROWS,
+ String.valueOf(m.getRows()));
+ job.getConfiguration().set(CollectBlocksMapper.COLUMNS,
+ String.valueOf(m.getColumns()));
+ job.getConfiguration().setBoolean(CollectBlocksMapper.MATRIX_POS, bool);
+
+ TableMapReduceUtil.initTableMapperJob(m.getPath(), scan,
+ org.apache.hama.examples.mapreduce.CollectBlocksMapper.class, BlockID.class,
+ MapWritable.class, job);
+ TableMapReduceUtil.initTableReducerJob(collectionTable,
+ org.apache.hama.examples.mapreduce.CollectBlocksReducer.class, job);
+
+ try {
+ job.waitForCompletion(true);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static void ensureForMultiplication(Matrix A, Matrix m)
+ throws IOException {
+ if (A.getColumns() != m.getRows()) {
+ throw new IOException("A's columns should equal with B's rows while A*B.");
+ }
+ }
+
+ public static boolean checkAllJobs(List jobId) throws IOException {
+ Iterator it = jobId.iterator();
+ boolean allTrue = true;
+ while (it.hasNext()) {
+ if (!it.next().isComplete()) {
+ allTrue = false;
+ }
+ }
+
+ return allTrue;
+ }
}
Index: src/examples/org/apache/hama/examples/MatrixNorm.java
===================================================================
--- src/examples/org/apache/hama/examples/MatrixNorm.java (리비전 942646)
+++ src/examples/org/apache/hama/examples/MatrixNorm.java (작업 사본)
@@ -21,8 +21,20 @@
import java.io.IOException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hama.Constants;
import org.apache.hama.HamaAdmin;
import org.apache.hama.HamaAdminImpl;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.examples.mapreduce.MatrixNormMapReduce;
import org.apache.hama.matrix.Matrix;
import org.apache.hama.matrix.Matrix.Norm;
@@ -40,16 +52,198 @@
Matrix a = admin.getMatrix(ARGS.get(0));
if(ARGS.get(1).equalsIgnoreCase("one")) {
System.out.println("The maximum absolute column sum of matrix '" + ARGS.get(0)
- + "' is " + a.norm(Norm.One));
+ + "' is " + norm(a, Norm.One));
} else if(ARGS.get(1).equalsIgnoreCase("infinity")) {
System.out.println("The maximum absolute row sum of matrix '" + ARGS.get(0)
- + "' is " + a.norm(Norm.Infinity));
+ + "' is " + norm(a, Norm.Infinity));
} else if(ARGS.get(1).equalsIgnoreCase("frobenius")) {
System.out.println("The root of sum of squares of matrix '" + ARGS.get(0)
- + "' is " + a.norm(Norm.Frobenius));
+ + "' is " + norm(a, Norm.Frobenius));
} else {
System.out.println("The max absolute cell value of matrix '" + ARGS.get(0)
- + "' is " + a.norm(Norm.Maxvalue));
+ + "' is " + norm(a, Norm.Maxvalue));
}
}
+
+ /**
+ * Computes the given norm of the matrix
+ *
+ * @param type
+ * @return norm of the matrix
+ * @throws IOException
+ */
+ public static double norm(Matrix a, Norm type) throws IOException {
+ if (type == Norm.One)
+ return getNorm1(a);
+ else if (type == Norm.Frobenius)
+ return getFrobenius(a);
+ else if (type == Norm.Infinity)
+ return getInfinity(a);
+ else
+ return getMaxvalue(a);
+ }
+
+
+ public static double getNorm1(Matrix a) throws IOException {
+ final FileSystem fs = FileSystem.get(conf);
+ Path outDir = new Path(new Path(a.getType() + "_TMP_norm1_dir_"
+ + System.currentTimeMillis()), "out");
+ if (fs.exists(outDir))
+ fs.delete(outDir, true);
+
+ Job job = new Job(conf, "norm1 MR job : " + a.getPath());
+ Scan scan = new Scan();
+ scan.addFamily(Constants.COLUMNFAMILY);
+
+ TableMapReduceUtil.initTableMapperJob(a.getPath(), scan,
+ MatrixNormMapReduce.MatrixOneNormMapper.class, IntWritable.class,
+ DoubleWritable.class, job);
+
+ job.setCombinerClass(MatrixNormMapReduce.MatrixOneNormCombiner.class);
+ job.setReducerClass(MatrixNormMapReduce.MatrixOneNormReducer.class);
+ job.setNumReduceTasks(1);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(DoubleWritable.class);
+ SequenceFileOutputFormat.setOutputPath(job, outDir);
+
+ try {
+ job.waitForCompletion(true);
+ System.out.println(job.reduceProgress());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+
+ // read outputs
+ double result = readOutput(conf, fs, outDir);
+ fs.delete(outDir.getParent(), true);
+ return result;
+ }
+
+ protected static double getMaxvalue(Matrix a) throws IOException {
+ final FileSystem fs = FileSystem.get(conf);
+ Path outDir = new Path(new Path(a.getType() + "_TMP_normMaxValue_dir_"
+ + System.currentTimeMillis()), "out");
+ if (fs.exists(outDir))
+ fs.delete(outDir, true);
+
+ Job job = new Job(conf, "MaxValue Norm MR job : " + a.getPath());
+ Scan scan = new Scan();
+ scan.addFamily(Constants.COLUMNFAMILY);
+
+ TableMapReduceUtil.initTableMapperJob(a.getPath(), scan,
+ MatrixNormMapReduce.MatrixMaxValueNormMapper.class, IntWritable.class,
+ DoubleWritable.class, job);
+
+ job.setCombinerClass(MatrixNormMapReduce.MatrixMaxValueNormReducer.class);
+ job.setReducerClass(MatrixNormMapReduce.MatrixMaxValueNormReducer.class);
+ job.setNumReduceTasks(1);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(DoubleWritable.class);
+ SequenceFileOutputFormat.setOutputPath(job, outDir);
+
+ try {
+ job.waitForCompletion(true);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+
+ // read outputs
+ double result = readOutput(conf, fs, outDir);
+ fs.delete(outDir.getParent(), true);
+ return result;
+ }
+
+ protected static double getInfinity(Matrix a) throws IOException {
+ final FileSystem fs = FileSystem.get(conf);
+ Path outDir = new Path(new Path(a.getType() + "_TMP_normInifity_dir_"
+ + System.currentTimeMillis()), "out");
+ if (fs.exists(outDir))
+ fs.delete(outDir, true);
+
+ Job job = new Job(conf, "Infinity Norm MR job : " + a.getPath());
+ Scan scan = new Scan();
+ scan.addFamily(Constants.COLUMNFAMILY);
+
+ TableMapReduceUtil.initTableMapperJob(a.getPath(), scan,
+ MatrixNormMapReduce.MatrixInfinityNormMapper.class, IntWritable.class,
+ DoubleWritable.class, job);
+
+ job.setCombinerClass(MatrixNormMapReduce.MatrixInfinityNormReduce.class);
+ job.setReducerClass(MatrixNormMapReduce.MatrixInfinityNormReduce.class);
+ job.setNumReduceTasks(1);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(DoubleWritable.class);
+ SequenceFileOutputFormat.setOutputPath(job, outDir);
+
+ try {
+ job.waitForCompletion(true);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+
+ // read outputs
+ double result = readOutput(conf, fs, outDir);
+ fs.delete(outDir.getParent(), true);
+ return result;
+ }
+
+ protected static double getFrobenius(Matrix a) throws IOException {
+ final FileSystem fs = FileSystem.get(conf);
+ Path outDir = new Path(new Path(a.getType() + "_TMP_normFrobenius_dir_"
+ + System.currentTimeMillis()), "out");
+ if (fs.exists(outDir))
+ fs.delete(outDir, true);
+
+ Job job = new Job(conf, "Frobenius Norm MR job : " + a.getPath());
+ Scan scan = new Scan();
+ scan.addFamily(Constants.COLUMNFAMILY);
+
+ TableMapReduceUtil.initTableMapperJob(a.getPath(), scan,
+ MatrixNormMapReduce.MatrixFrobeniusNormMapper.class, IntWritable.class,
+ DoubleWritable.class, job);
+
+ job.setCombinerClass(MatrixNormMapReduce.MatrixFrobeniusNormCombiner.class);
+ job.setReducerClass(MatrixNormMapReduce.MatrixFrobeniusNormReducer.class);
+ job.setNumReduceTasks(1);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(DoubleWritable.class);
+ SequenceFileOutputFormat.setOutputPath(job, outDir);
+
+ try {
+ job.waitForCompletion(true);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+
+ // read outputs
+ double result = readOutput(conf, fs, outDir);
+ fs.delete(outDir.getParent(), true);
+ return result;
+ }
+
+ public static double readOutput(HamaConfiguration config, FileSystem fs, Path outDir)
+ throws IOException {
+ Path inFile = new Path(outDir, "part-r-00000");
+ IntWritable numInside = new IntWritable();
+ DoubleWritable result = new DoubleWritable();
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, config);
+ try {
+ reader.next(numInside, result);
+ } finally {
+ reader.close();
+ }
+ return result.get();
+ }
}
Index: src/examples/org/apache/hama/examples/RandomMatrix.java
===================================================================
--- src/examples/org/apache/hama/examples/RandomMatrix.java (리비전 942646)
+++ src/examples/org/apache/hama/examples/RandomMatrix.java (작업 사본)
@@ -21,12 +21,29 @@
import java.io.IOException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.examples.mapreduce.RandomMatrixMapper;
+import org.apache.hama.examples.mapreduce.RandomMatrixReducer;
import org.apache.hama.matrix.DenseMatrix;
import org.apache.hama.matrix.Matrix;
import org.apache.hama.matrix.SparseMatrix;
public class RandomMatrix extends AbstractExample {
-
+ static private String TABLE_PREFIX;
+ static private Path TMP_DIR;
+
public static void main(String[] args) throws IOException {
if (args.length < 3) {
System.out
@@ -44,10 +61,145 @@
Matrix a;
if(percent == 100)
- a = DenseMatrix.random_mapred(conf, row, column);
+ a = random_mapred(conf, row, column);
else
- a = SparseMatrix.random_mapred(conf, row, column, percent);
+ a = random_mapred(conf, row, column, percent);
a.save(ARGS.get(3));
}
+
+
+ /**
+ * Generate matrix with random elements using Map/Reduce
+ *
+ * @param conf configuration object
+ * @param m the number of rows.
+ * @param n the number of columns.
+ * @return an m-by-n matrix with uniformly distributed random elements.
+ * @throws IOException
+ */
+ public static DenseMatrix random_mapred(HamaConfiguration conf, int m, int n)
+ throws IOException {
+ TABLE_PREFIX = "DenseMatrix";
+ TMP_DIR = new Path(TABLE_PREFIX + "_TMP_dir");
+ DenseMatrix rand = new DenseMatrix(conf, m, n);
+
+ Job job = new Job(conf, "random matrix MR job : " + rand.getPath());
+ final Path inDir = new Path(TMP_DIR, "in");
+ FileInputFormat.setInputPaths(job, inDir);
+ job.setMapperClass(RandomMatrixMapper.class);
+
+ job.setMapOutputKeyClass(IntWritable.class);
+ job.setMapOutputValueClass(MapWritable.class);
+
+ job.getConfiguration().setInt("matrix.column", n);
+ job.getConfiguration().set("matrix.type", TABLE_PREFIX);
+ job.getConfiguration().set("matrix.density", "100");
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ final FileSystem fs = FileSystem.get(job.getConfiguration());
+ int interval = m / conf.getNumMapTasks();
+
+ // generate an input file for each map task
+ for (int i = 0; i < conf.getNumMapTasks(); ++i) {
+ final Path file = new Path(inDir, "part" + i);
+ final IntWritable start = new IntWritable(i * interval);
+ IntWritable end = null;
+ if ((i + 1) != conf.getNumMapTasks()) {
+ end = new IntWritable(((i * interval) + interval) - 1);
+ } else {
+ end = new IntWritable(m - 1);
+ }
+ final SequenceFile.Writer writer = SequenceFile.createWriter(fs, job
+ .getConfiguration(), file, IntWritable.class, IntWritable.class,
+ CompressionType.NONE);
+ try {
+ writer.append(start, end);
+ } finally {
+ writer.close();
+ }
+ System.out.println("Wrote input for Map #" + i);
+ }
+
+ job.setOutputFormatClass(TableOutputFormat.class);
+ job.setReducerClass(RandomMatrixReducer.class);
+ job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, rand.getPath());
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(Writable.class);
+
+ try {
+ job.waitForCompletion(true);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ fs.delete(TMP_DIR, true);
+ return rand;
+ }
+
+ public static SparseMatrix random_mapred(HamaConfiguration conf, int m,
+ int n, double percent) throws IOException {
+ TABLE_PREFIX = "SparseMatrix";
+ TMP_DIR = new Path(TABLE_PREFIX + "_TMP_dir");
+ SparseMatrix rand = new SparseMatrix(conf, m, n);
+
+ Job job = new Job(conf, "random matrix MR job : " + rand.getPath());
+ final Path inDir = new Path(TMP_DIR, "in");
+ FileInputFormat.setInputPaths(job, inDir);
+ job.setMapperClass(RandomMatrixMapper.class);
+
+ job.setMapOutputKeyClass(IntWritable.class);
+ job.setMapOutputValueClass(MapWritable.class);
+
+ job.getConfiguration().setInt("matrix.column", n);
+ job.getConfiguration().set("matrix.type", TABLE_PREFIX);
+ job.getConfiguration().set("matrix.density", String.valueOf(percent));
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ final FileSystem fs = FileSystem.get(job.getConfiguration());
+ int interval = m / conf.getNumMapTasks();
+
+ // generate an input file for each map task
+ for (int i = 0; i < conf.getNumMapTasks(); ++i) {
+ final Path file = new Path(inDir, "part" + i);
+ final IntWritable start = new IntWritable(i * interval);
+ IntWritable end = null;
+ if ((i + 1) != conf.getNumMapTasks()) {
+ end = new IntWritable(((i * interval) + interval) - 1);
+ } else {
+ end = new IntWritable(m - 1);
+ }
+ final SequenceFile.Writer writer = SequenceFile.createWriter(fs, job
+ .getConfiguration(), file, IntWritable.class, IntWritable.class,
+ CompressionType.NONE);
+ try {
+ writer.append(start, end);
+ } finally {
+ writer.close();
+ }
+ System.out.println("Wrote input for Map #" + i);
+ }
+
+ job.setOutputFormatClass(TableOutputFormat.class);
+ job.setReducerClass(RandomMatrixReducer.class);
+ job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, rand.getPath());
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(Writable.class);
+
+ try {
+ job.waitForCompletion(true);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ fs.delete(TMP_DIR, true);
+ return rand;
+ }
+
}
Index: src/examples/org/apache/hama/examples/mapreduce/BlockMultMap.java
===================================================================
--- src/examples/org/apache/hama/examples/mapreduce/BlockMultMap.java (리비전 0)
+++ src/examples/org/apache/hama/examples/mapreduce/BlockMultMap.java (리비전 0)
@@ -0,0 +1,25 @@
+package org.apache.hama.examples.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hama.Constants;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.matrix.SubMatrix;
+
+public class BlockMultMap extends TableMapper {
+ private byte[] COLUMN = Bytes.toBytes(Constants.BLOCK);
+
+ public void map(ImmutableBytesWritable key, Result value, Context context)
+ throws IOException, InterruptedException {
+ SubMatrix a = new SubMatrix(value.getValue(COLUMN, Bytes.toBytes("a")));
+ SubMatrix b = new SubMatrix(value.getValue(COLUMN, Bytes.toBytes("b")));
+
+ SubMatrix c = a.mult(b);
+ context.write(new BlockID(key.get()), new BytesWritable(c.getBytes()));
+ }
+}
Index: src/examples/org/apache/hama/examples/mapreduce/BlockMultReduce.java
===================================================================
--- src/examples/org/apache/hama/examples/mapreduce/BlockMultReduce.java (리비전 0)
+++ src/examples/org/apache/hama/examples/mapreduce/BlockMultReduce.java (리비전 0)
@@ -0,0 +1,46 @@
+package org.apache.hama.examples.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableReducer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.matrix.SubMatrix;
+import org.apache.hama.util.BytesUtil;
+
+public class BlockMultReduce extends
+ TableReducer {
+
+ @Override
+ public void reduce(BlockID key, Iterable values,
+ Context context) throws IOException, InterruptedException {
+ SubMatrix s = null;
+ for (BytesWritable value : values) {
+ SubMatrix b = new SubMatrix(value.getBytes());
+ if (s == null) {
+ s = b;
+ } else {
+ s = s.add(b);
+ }
+ }
+
+ int startRow = key.getRow() * s.getRows();
+ int startColumn = key.getColumn() * s.getColumns();
+
+ for (int i = 0; i < s.getRows(); i++) {
+ Put put = new Put(BytesUtil.getRowIndex(i + startRow));
+ for (int j = 0; j < s.getColumns(); j++) {
+ put.add(Constants.COLUMNFAMILY, Bytes.toBytes(String.valueOf(j + startColumn)),
+ Bytes.toBytes(s.get(i, j)));
+ }
+
+ context.write(new ImmutableBytesWritable(BytesUtil.getRowIndex(key
+ .getRow())), put);
+ }
+ }
+}
Index: src/examples/org/apache/hama/examples/mapreduce/CollectBlocksMapper.java
===================================================================
--- src/examples/org/apache/hama/examples/mapreduce/CollectBlocksMapper.java (리비전 0)
+++ src/examples/org/apache/hama/examples/mapreduce/CollectBlocksMapper.java (리비전 0)
@@ -0,0 +1,63 @@
+package org.apache.hama.examples.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.matrix.DenseVector;
+import org.apache.hama.util.BytesUtil;
+
+public class CollectBlocksMapper extends TableMapper
+ implements Configurable {
+ private Configuration conf = null;
+ /** Parameter of the path of the matrix to be blocked * */
+ public static final String BLOCK_SIZE = "hama.blocking.size";
+ public static final String ROWS = "hama.blocking.rows";
+ public static final String COLUMNS = "hama.blocking.columns";
+ public static final String MATRIX_POS = "a.or.b";
+
+ private int mBlockNum;
+ private int mBlockRowSize;
+ private int mBlockColSize;
+ private int mRows;
+ private int mColumns;
+
+ public void map(ImmutableBytesWritable key, Result value, Context context)
+ throws IOException, InterruptedException {
+ int startColumn, endColumn, blkRow = BytesUtil.getRowIndex(key.get())
+ / mBlockRowSize, i = 0;
+ DenseVector dv = new DenseVector(BytesUtil.getRowIndex(key.get()), value);
+
+ do {
+ startColumn = i * mBlockColSize;
+ endColumn = startColumn + mBlockColSize - 1;
+ if (endColumn >= mColumns) // the last sub vector
+ endColumn = mColumns - 1;
+ context.write(new BlockID(blkRow, i), dv.subVector(startColumn, endColumn).getEntries());
+
+ i++;
+ } while (endColumn < (mColumns - 1));
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+
+ mBlockNum = Integer.parseInt(conf.get(BLOCK_SIZE, ""));
+ mRows = Integer.parseInt(conf.get(ROWS, ""));
+ mColumns = Integer.parseInt(conf.get(COLUMNS, ""));
+
+ mBlockRowSize = mRows / mBlockNum;
+ mBlockColSize = mColumns / mBlockNum;
+ }
+}
Index: src/examples/org/apache/hama/examples/mapreduce/CollectBlocksReducer.java
===================================================================
--- src/examples/org/apache/hama/examples/mapreduce/CollectBlocksReducer.java (리비전 0)
+++ src/examples/org/apache/hama/examples/mapreduce/CollectBlocksReducer.java (리비전 0)
@@ -0,0 +1,107 @@
+package org.apache.hama.examples.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableReducer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.matrix.DenseVector;
+import org.apache.hama.matrix.SubMatrix;
+
+public class CollectBlocksReducer extends
+ TableReducer implements Configurable {
+ private Configuration conf = null;
+ private int mBlockNum;
+ private int mBlockRowSize;
+ private int mBlockColSize;
+ private int mRows;
+ private int mColumns;
+ private boolean matrixPos;
+
+ public void reduce(BlockID key, Iterable values,
+ Context context) throws IOException, InterruptedException {
+ // the block's base offset in the original matrix
+ int colBase = key.getColumn() * mBlockColSize;
+ int rowBase = key.getRow() * mBlockRowSize;
+
+ // the block's size : rows & columns
+ int smRows = mBlockRowSize;
+ if ((rowBase + mBlockRowSize - 1) >= mRows)
+ smRows = mRows - rowBase;
+ int smCols = mBlockColSize;
+ if ((colBase + mBlockColSize - 1) >= mColumns)
+ smCols = mColumns - colBase;
+
+ // construct the matrix
+ SubMatrix subMatrix = new SubMatrix(smRows, smCols);
+ // i, j is the current offset in the sub-matrix
+ int i = 0, j = 0;
+ for (MapWritable value : values) {
+ DenseVector vw = new DenseVector(value);
+ // check the size is suitable
+ if (vw.size() != smCols)
+ throw new IOException("Block Column Size dismatched.");
+ i = vw.getRow() - rowBase;
+
+ if (i >= smRows || i < 0)
+ throw new IOException("Block Row Size dismatched.");
+
+ // put the subVector to the subMatrix
+ for (j = 0; j < smCols; j++) {
+ subMatrix.set(i, j, vw.get(colBase + j));
+ }
+ }
+ //BlockWritable outValue = new BlockWritable(subMatrix);
+
+ // It'll used for only matrix multiplication.
+ if (matrixPos) {
+ for (int x = 0; x < mBlockNum; x++) {
+ int r = (key.getRow() * mBlockNum) * mBlockNum;
+ int seq = (x * mBlockNum) + key.getColumn() + r;
+ BlockID bkID = new BlockID(key.getRow(), x, seq);
+ Put put = new Put(bkID.getBytes());
+ put.add(Bytes.toBytes(Constants.BLOCK),
+ Bytes.toBytes("a"),
+ subMatrix.getBytes());
+ context.write(new ImmutableBytesWritable(bkID.getBytes()), put);
+ }
+ } else {
+ for (int x = 0; x < mBlockNum; x++) {
+ int seq = (x * mBlockNum * mBlockNum) + (key.getColumn() * mBlockNum)
+ + key.getRow();
+ BlockID bkID = new BlockID(x, key.getColumn(), seq);
+ Put put = new Put(bkID.getBytes());
+ put.add(Bytes.toBytes(Constants.BLOCK),
+ Bytes.toBytes("b"),
+ subMatrix.getBytes());
+ context.write(new ImmutableBytesWritable(bkID.getBytes()), put);
+ }
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+
+ mBlockNum = Integer.parseInt(conf.get(CollectBlocksMapper.BLOCK_SIZE, ""));
+ mRows = Integer.parseInt(conf.get(CollectBlocksMapper.ROWS, ""));
+ mColumns = Integer.parseInt(conf.get(CollectBlocksMapper.COLUMNS, ""));
+
+ mBlockRowSize = mRows / mBlockNum;
+ mBlockColSize = mColumns / mBlockNum;
+
+ matrixPos = conf.getBoolean(CollectBlocksMapper.MATRIX_POS, true);
+ }
+}
Index: src/examples/org/apache/hama/examples/mapreduce/DenseMatrixVectorMultMap.java
===================================================================
--- src/examples/org/apache/hama/examples/mapreduce/DenseMatrixVectorMultMap.java (리비전 0)
+++ src/examples/org/apache/hama/examples/mapreduce/DenseMatrixVectorMultMap.java (리비전 0)
@@ -0,0 +1,73 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.matrix.DenseMatrix;
+import org.apache.hama.matrix.DenseVector;
+import org.apache.hama.util.BytesUtil;
+
+public class DenseMatrixVectorMultMap extends
+ TableMapper implements Configurable {
+ private Configuration conf = null;
+ protected DenseVector currVector;
+ public static final String ITH_ROW = "ith.row";
+ public static final String MATRIX_A = "hama.multiplication.matrix.a";
+ private IntWritable nKey = new IntWritable();
+
+ public void map(ImmutableBytesWritable key, Result value, Context context)
+ throws IOException, InterruptedException {
+ double ithjth = currVector.get(BytesUtil.getRowIndex(key.get()));
+ if (ithjth != 0) {
+ DenseVector scaled = new DenseVector(value).scale(ithjth);
+ context.write(nKey, scaled.getEntries());
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ DenseMatrix matrix_a;
+ try {
+ matrix_a = new DenseMatrix(new HamaConfiguration(conf), conf.get(MATRIX_A,
+ ""));
+ int ithRow = conf.getInt(ITH_ROW, 0);
+ nKey.set(ithRow);
+ currVector = matrix_a.getRow(ithRow);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
Index: src/examples/org/apache/hama/examples/mapreduce/DenseMatrixVectorMultReduce.java
===================================================================
--- src/examples/org/apache/hama/examples/mapreduce/DenseMatrixVectorMultReduce.java (리비전 0)
+++ src/examples/org/apache/hama/examples/mapreduce/DenseMatrixVectorMultReduce.java (리비전 0)
@@ -0,0 +1,66 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.mapreduce;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableReducer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
+import org.apache.hama.matrix.DenseVector;
+import org.apache.hama.util.BytesUtil;
+
+public class DenseMatrixVectorMultReduce extends
+ TableReducer {
+
+ @Override
+ public void reduce(IntWritable key, Iterable values,
+ Context context) throws IOException, InterruptedException {
+ DenseVector sum = new DenseVector();
+
+ for (MapWritable value : values) {
+ DenseVector nVector = new DenseVector(value);
+
+ if (sum.size() == 0) {
+ sum.zeroFill(nVector.size());
+ sum.add(nVector);
+ } else {
+ sum.add(nVector);
+ }
+ }
+
+ Put put = new Put(BytesUtil.getRowIndex(key.get()));
+ for (Map.Entry e : sum.getEntries().entrySet()) {
+ put.add(Constants.COLUMNFAMILY, Bytes.toBytes(String
+ .valueOf(((IntWritable) e.getKey()).get())), Bytes
+ .toBytes(((DoubleWritable) e.getValue()).get()));
+ }
+
+ context.write(new ImmutableBytesWritable(BytesUtil.getRowIndex(key.get())),
+ put);
+ }
+}
Index: src/examples/org/apache/hama/examples/mapreduce/MatrixNormMapReduce.java
===================================================================
--- src/examples/org/apache/hama/examples/mapreduce/MatrixNormMapReduce.java (리비전 0)
+++ src/examples/org/apache/hama/examples/mapreduce/MatrixNormMapReduce.java (리비전 0)
@@ -0,0 +1,221 @@
+package org.apache.hama.examples.mapreduce;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hama.Constants;
+import org.apache.hama.util.BytesUtil;
+import org.apache.log4j.Logger;
+
+/** A Catalog class collect all the mr classes to compute the matrix's norm */
+public class MatrixNormMapReduce {
+ public final static IntWritable nKey = new IntWritable(-1);
+
+ /** Infinity Norm */
+ public static class MatrixInfinityNormMapper extends
+ TableMapper {
+ private DoubleWritable nValue = new DoubleWritable();
+
+ @Override
+ public void map(ImmutableBytesWritable key, Result value, Context context)
+ throws IOException, InterruptedException {
+
+ double rowSum = 0;
+ NavigableMap v = value
+ .getFamilyMap(Constants.COLUMNFAMILY);
+ for (Map.Entry e : v.entrySet()) {
+ rowSum += Math.abs(Bytes.toDouble(e.getValue()));
+ }
+
+ nValue.set(rowSum);
+ context.write(MatrixNormMapReduce.nKey, nValue);
+ }
+ }
+
+ /**
+ * Matrix Infinity Norm Reducer
+ */
+ public static class MatrixInfinityNormReduce extends
+ Reducer {
+ static final Logger LOG = Logger.getLogger(MatrixInfinityNormReduce.class);
+ private double max = 0;
+ private DoubleWritable nValue = new DoubleWritable();
+
+ public void reduce(IntWritable key, Iterable values,
+ Context context) throws IOException, InterruptedException {
+ for (DoubleWritable val : values) {
+ max = Math.max(val.get(), max);
+ }
+
+ nValue.set(max);
+ context.write(MatrixNormMapReduce.nKey, nValue);
+ }
+ }
+
+ /** One Norm Mapper */
+ public static class MatrixOneNormMapper extends
+ TableMapper {
+ private IntWritable newkey = new IntWritable();
+ private DoubleWritable nValue = new DoubleWritable();
+
+ @Override
+ public void map(ImmutableBytesWritable key, Result value, Context context)
+ throws IOException, InterruptedException {
+
+ NavigableMap v = value
+ .getFamilyMap(Constants.COLUMNFAMILY);
+ for (Map.Entry e : v.entrySet()) {
+ newkey.set(BytesUtil.bytesToInt(e.getKey()));
+ nValue.set(Bytes.toDouble(e.getValue()));
+ context.write(newkey, nValue);
+ }
+ }
+ }
+
+ /** One Norm Combiner * */
+ public static class MatrixOneNormCombiner extends
+ Reducer {
+ private DoubleWritable nValue = new DoubleWritable();
+
+ @Override
+ public void reduce(IntWritable key, Iterable values,
+ Context context) throws IOException, InterruptedException {
+
+ double partialColSum = 0;
+ for (DoubleWritable val : values) {
+ partialColSum += val.get();
+ }
+
+ nValue.set(partialColSum);
+ context.write(key, nValue);
+ }
+ }
+
+ /** One Norm Reducer * */
+ public static class MatrixOneNormReducer extends
+ Reducer {
+ private double max = 0;
+
+ @Override
+ public void reduce(IntWritable key, Iterable values,
+ Context context) throws IOException, InterruptedException {
+ double colSum = 0;
+
+ for (DoubleWritable val : values) {
+ colSum += val.get();
+ }
+
+ max = Math.max(Math.abs(colSum), max);
+ }
+
+ public void cleanup(Context context) throws IOException,
+ InterruptedException {
+ context.write(MatrixNormMapReduce.nKey, new DoubleWritable(max));
+ }
+ }
+
+ /** Frobenius Norm Mapper */
+ public static class MatrixFrobeniusNormMapper extends
+ TableMapper {
+ private DoubleWritable nValue = new DoubleWritable();
+
+ @Override
+ public void map(ImmutableBytesWritable key, Result value, Context context)
+ throws IOException, InterruptedException {
+ double rowSqrtSum = 0;
+
+ NavigableMap v = value
+ .getFamilyMap(Constants.COLUMNFAMILY);
+ for (Map.Entry e : v.entrySet()) {
+ double cellValue = Bytes.toDouble(e.getValue());
+ rowSqrtSum += (cellValue * cellValue);
+ }
+
+ nValue.set(rowSqrtSum);
+ context.write(MatrixNormMapReduce.nKey, nValue);
+ }
+ }
+
+ /** Frobenius Norm Combiner */
+ public static class MatrixFrobeniusNormCombiner extends
+ Reducer {
+ private double sqrtSum = 0;
+ private DoubleWritable nValue = new DoubleWritable();
+
+ @Override
+ public void reduce(IntWritable key, Iterable values,
+ Context context) throws IOException, InterruptedException {
+ for (DoubleWritable val : values) {
+ sqrtSum += val.get();
+ }
+
+ nValue.set(sqrtSum);
+ context.write(MatrixNormMapReduce.nKey, nValue);
+ }
+ }
+
+ /** Frobenius Norm Reducer */
+ public static class MatrixFrobeniusNormReducer extends
+ Reducer {
+ private double sqrtSum = 0;
+
+ @Override
+ public void reduce(IntWritable key, Iterable values,
+ Context context) throws IOException, InterruptedException {
+ for (DoubleWritable val : values) {
+ sqrtSum += val.get();
+ }
+
+ context.write(MatrixNormMapReduce.nKey, new DoubleWritable(Math
+ .sqrt(sqrtSum)));
+ }
+ }
+
+ /** MaxValue Norm Mapper * */
+ public static class MatrixMaxValueNormMapper extends
+ TableMapper {
+ private DoubleWritable nValue = new DoubleWritable();
+
+ @Override
+ public void map(ImmutableBytesWritable key, Result value, Context context)
+ throws IOException, InterruptedException {
+ double max = 0;
+
+ NavigableMap v = value
+ .getFamilyMap(Constants.COLUMNFAMILY);
+ for (Map.Entry e : v.entrySet()) {
+ double cellValue = Bytes.toDouble(e.getValue());
+ max = cellValue > max ? cellValue : max;
+ }
+
+ nValue.set(max);
+ context.write(MatrixNormMapReduce.nKey, nValue);
+ }
+ }
+
+ /** MaxValue Norm Reducer */
+ public static class MatrixMaxValueNormReducer extends
+ Reducer {
+ private double max = 0;
+ private DoubleWritable nValue = new DoubleWritable();
+
+ @Override
+ public void reduce(IntWritable key, Iterable values,
+ Context context) throws IOException, InterruptedException {
+ for (DoubleWritable val : values) {
+ max = Math.max(val.get(), max);
+ }
+
+ nValue.set(max);
+ context.write(MatrixNormMapReduce.nKey, nValue);
+ }
+ }
+}
Index: src/examples/org/apache/hama/examples/mapreduce/RandomMatrixMapper.java
===================================================================
--- src/examples/org/apache/hama/examples/mapreduce/RandomMatrixMapper.java (리비전 0)
+++ src/examples/org/apache/hama/examples/mapreduce/RandomMatrixMapper.java (리비전 0)
@@ -0,0 +1,73 @@
+package org.apache.hama.examples.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hama.matrix.DenseVector;
+import org.apache.hama.matrix.SparseVector;
+import org.apache.hama.matrix.Vector;
+import org.apache.hama.util.RandomVariable;
+import org.apache.log4j.Logger;
+
+public class RandomMatrixMapper extends
+ Mapper implements
+ Configurable {
+ private Configuration conf = null;
+ static final Logger LOG = Logger.getLogger(RandomMatrixMapper.class);
+ protected int column;
+ protected double density;
+ protected int minNums;
+ protected String type;
+ protected Vector vector = new DenseVector();
+
+ public void map(IntWritable key, IntWritable value,
+ Context context)
+ throws IOException, InterruptedException {
+
+ if (type.equals("SparseMatrix")) {
+ for (int i = key.get(); i <= value.get(); i++) {
+ ((SparseVector) vector).clear();
+ for (int j = 0; j < minNums; j++) {
+ ((SparseVector) vector).set(RandomVariable.randInt(0, column - 1),
+ RandomVariable.rand());
+ }
+ context.write(new IntWritable(i), vector.getEntries());
+ }
+ } else {
+ for (int i = key.get(); i <= value.get(); i++) {
+ ((DenseVector) vector).clear();
+ for (int j = 0; j < column; j++) {
+ ((DenseVector) vector).set(j, RandomVariable.rand());
+ }
+ context.write(new IntWritable(i), vector.getEntries());
+ }
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ column = conf.getInt("matrix.column", 0);
+ density = Double.parseDouble(conf.get("matrix.density"));
+
+ double vv = (column / 100.0) * density;
+ minNums = Math.round((float) vv);
+ if (minNums == 0)
+ minNums = 1;
+
+ type = conf.get("matrix.type");
+ if (type.equals("SparseMatrix"))
+ vector = new SparseVector();
+ else
+ vector = new DenseVector();
+ }
+}
Index: src/examples/org/apache/hama/examples/mapreduce/RandomMatrixReducer.java
===================================================================
--- src/examples/org/apache/hama/examples/mapreduce/RandomMatrixReducer.java (리비전 0)
+++ src/examples/org/apache/hama/examples/mapreduce/RandomMatrixReducer.java (리비전 0)
@@ -0,0 +1,34 @@
+package org.apache.hama.examples.mapreduce;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableReducer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
+import org.apache.hama.util.BytesUtil;
+import org.apache.log4j.Logger;
+
+public class RandomMatrixReducer extends
+ TableReducer {
+ static final Logger LOG = Logger.getLogger(RandomMatrixReducer.class);
+
+ public void reduce(IntWritable key, Iterable values,
+ Context context) throws IOException, InterruptedException {
+ Put put = new Put(BytesUtil.getRowIndex(key.get()));
+ for (Map.Entry e : values.iterator().next().entrySet()) {
+ put.add(Constants.COLUMNFAMILY, Bytes.toBytes(String
+ .valueOf(((IntWritable) e.getKey()).get())), Bytes
+ .toBytes(((DoubleWritable) e.getValue()).get()));
+ }
+
+ context.write(new ImmutableBytesWritable(BytesUtil.getRowIndex(key.get())),
+ put);
+ }
+}
Index: src/java/org/apache/hama/mapreduce/CollectBlocksMapper.java
===================================================================
--- src/java/org/apache/hama/mapreduce/CollectBlocksMapper.java (리비전 942646)
+++ src/java/org/apache/hama/mapreduce/CollectBlocksMapper.java (작업 사본)
@@ -1,63 +0,0 @@
-package org.apache.hama.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableMapper;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hama.io.BlockID;
-import org.apache.hama.matrix.DenseVector;
-import org.apache.hama.util.BytesUtil;
-
-public class CollectBlocksMapper extends TableMapper
- implements Configurable {
- private Configuration conf = null;
- /** Parameter of the path of the matrix to be blocked * */
- public static final String BLOCK_SIZE = "hama.blocking.size";
- public static final String ROWS = "hama.blocking.rows";
- public static final String COLUMNS = "hama.blocking.columns";
- public static final String MATRIX_POS = "a.or.b";
-
- private int mBlockNum;
- private int mBlockRowSize;
- private int mBlockColSize;
- private int mRows;
- private int mColumns;
-
- public void map(ImmutableBytesWritable key, Result value, Context context)
- throws IOException, InterruptedException {
- int startColumn, endColumn, blkRow = BytesUtil.getRowIndex(key.get())
- / mBlockRowSize, i = 0;
- DenseVector dv = new DenseVector(BytesUtil.getRowIndex(key.get()), value);
-
- do {
- startColumn = i * mBlockColSize;
- endColumn = startColumn + mBlockColSize - 1;
- if (endColumn >= mColumns) // the last sub vector
- endColumn = mColumns - 1;
- context.write(new BlockID(blkRow, i), dv.subVector(startColumn, endColumn).getEntries());
-
- i++;
- } while (endColumn < (mColumns - 1));
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
-
- mBlockNum = Integer.parseInt(conf.get(BLOCK_SIZE, ""));
- mRows = Integer.parseInt(conf.get(ROWS, ""));
- mColumns = Integer.parseInt(conf.get(COLUMNS, ""));
-
- mBlockRowSize = mRows / mBlockNum;
- mBlockColSize = mColumns / mBlockNum;
- }
-}
Index: src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java
===================================================================
--- src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java (리비전 942646)
+++ src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java (작업 사본)
@@ -1,107 +0,0 @@
-package org.apache.hama.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableReducer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hama.Constants;
-import org.apache.hama.io.BlockID;
-import org.apache.hama.matrix.DenseVector;
-import org.apache.hama.matrix.SubMatrix;
-
-public class CollectBlocksReducer extends
- TableReducer implements Configurable {
- private Configuration conf = null;
- private int mBlockNum;
- private int mBlockRowSize;
- private int mBlockColSize;
- private int mRows;
- private int mColumns;
- private boolean matrixPos;
-
- public void reduce(BlockID key, Iterable values,
- Context context) throws IOException, InterruptedException {
- // the block's base offset in the original matrix
- int colBase = key.getColumn() * mBlockColSize;
- int rowBase = key.getRow() * mBlockRowSize;
-
- // the block's size : rows & columns
- int smRows = mBlockRowSize;
- if ((rowBase + mBlockRowSize - 1) >= mRows)
- smRows = mRows - rowBase;
- int smCols = mBlockColSize;
- if ((colBase + mBlockColSize - 1) >= mColumns)
- smCols = mColumns - colBase;
-
- // construct the matrix
- SubMatrix subMatrix = new SubMatrix(smRows, smCols);
- // i, j is the current offset in the sub-matrix
- int i = 0, j = 0;
- for (MapWritable value : values) {
- DenseVector vw = new DenseVector(value);
- // check the size is suitable
- if (vw.size() != smCols)
- throw new IOException("Block Column Size dismatched.");
- i = vw.getRow() - rowBase;
-
- if (i >= smRows || i < 0)
- throw new IOException("Block Row Size dismatched.");
-
- // put the subVector to the subMatrix
- for (j = 0; j < smCols; j++) {
- subMatrix.set(i, j, vw.get(colBase + j));
- }
- }
- //BlockWritable outValue = new BlockWritable(subMatrix);
-
- // It'll used for only matrix multiplication.
- if (matrixPos) {
- for (int x = 0; x < mBlockNum; x++) {
- int r = (key.getRow() * mBlockNum) * mBlockNum;
- int seq = (x * mBlockNum) + key.getColumn() + r;
- BlockID bkID = new BlockID(key.getRow(), x, seq);
- Put put = new Put(bkID.getBytes());
- put.add(Bytes.toBytes(Constants.BLOCK),
- Bytes.toBytes("a"),
- subMatrix.getBytes());
- context.write(new ImmutableBytesWritable(bkID.getBytes()), put);
- }
- } else {
- for (int x = 0; x < mBlockNum; x++) {
- int seq = (x * mBlockNum * mBlockNum) + (key.getColumn() * mBlockNum)
- + key.getRow();
- BlockID bkID = new BlockID(x, key.getColumn(), seq);
- Put put = new Put(bkID.getBytes());
- put.add(Bytes.toBytes(Constants.BLOCK),
- Bytes.toBytes("b"),
- subMatrix.getBytes());
- context.write(new ImmutableBytesWritable(bkID.getBytes()), put);
- }
- }
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
-
- mBlockNum = Integer.parseInt(conf.get(CollectBlocksMapper.BLOCK_SIZE, ""));
- mRows = Integer.parseInt(conf.get(CollectBlocksMapper.ROWS, ""));
- mColumns = Integer.parseInt(conf.get(CollectBlocksMapper.COLUMNS, ""));
-
- mBlockRowSize = mRows / mBlockNum;
- mBlockColSize = mColumns / mBlockNum;
-
- matrixPos = conf.getBoolean(CollectBlocksMapper.MATRIX_POS, true);
- }
-}
Index: src/java/org/apache/hama/mapreduce/RandomMatrixMapper.java
===================================================================
--- src/java/org/apache/hama/mapreduce/RandomMatrixMapper.java (리비전 942646)
+++ src/java/org/apache/hama/mapreduce/RandomMatrixMapper.java (작업 사본)
@@ -1,73 +0,0 @@
-package org.apache.hama.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hama.matrix.DenseVector;
-import org.apache.hama.matrix.SparseVector;
-import org.apache.hama.matrix.Vector;
-import org.apache.hama.util.RandomVariable;
-import org.apache.log4j.Logger;
-
-public class RandomMatrixMapper extends
- Mapper implements
- Configurable {
- private Configuration conf = null;
- static final Logger LOG = Logger.getLogger(RandomMatrixMapper.class);
- protected int column;
- protected double density;
- protected int minNums;
- protected String type;
- protected Vector vector = new DenseVector();
-
- public void map(IntWritable key, IntWritable value,
- Context context)
- throws IOException, InterruptedException {
-
- if (type.equals("SparseMatrix")) {
- for (int i = key.get(); i <= value.get(); i++) {
- ((SparseVector) vector).clear();
- for (int j = 0; j < minNums; j++) {
- ((SparseVector) vector).set(RandomVariable.randInt(0, column - 1),
- RandomVariable.rand());
- }
- context.write(new IntWritable(i), vector.getEntries());
- }
- } else {
- for (int i = key.get(); i <= value.get(); i++) {
- ((DenseVector) vector).clear();
- for (int j = 0; j < column; j++) {
- ((DenseVector) vector).set(j, RandomVariable.rand());
- }
- context.write(new IntWritable(i), vector.getEntries());
- }
- }
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- column = conf.getInt("matrix.column", 0);
- density = Double.parseDouble(conf.get("matrix.density"));
-
- double vv = (column / 100.0) * density;
- minNums = Math.round((float) vv);
- if (minNums == 0)
- minNums = 1;
-
- type = conf.get("matrix.type");
- if (type.equals("SparseMatrix"))
- vector = new SparseVector();
- else
- vector = new DenseVector();
- }
-}
Index: src/java/org/apache/hama/mapreduce/RandomMatrixReducer.java
===================================================================
--- src/java/org/apache/hama/mapreduce/RandomMatrixReducer.java (리비전 942646)
+++ src/java/org/apache/hama/mapreduce/RandomMatrixReducer.java (작업 사본)
@@ -1,34 +0,0 @@
-package org.apache.hama.mapreduce;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableReducer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hama.Constants;
-import org.apache.hama.util.BytesUtil;
-import org.apache.log4j.Logger;
-
-public class RandomMatrixReducer extends
- TableReducer {
- static final Logger LOG = Logger.getLogger(RandomMatrixReducer.class);
-
- public void reduce(IntWritable key, Iterable values,
- Context context) throws IOException, InterruptedException {
- Put put = new Put(BytesUtil.getRowIndex(key.get()));
- for (Map.Entry e : values.iterator().next().entrySet()) {
- put.add(Constants.COLUMNFAMILY, Bytes.toBytes(String
- .valueOf(((IntWritable) e.getKey()).get())), Bytes
- .toBytes(((DoubleWritable) e.getValue()).get()));
- }
-
- context.write(new ImmutableBytesWritable(BytesUtil.getRowIndex(key.get())),
- put);
- }
-}
Index: src/java/org/apache/hama/matrix/AbstractMatrix.java
===================================================================
--- src/java/org/apache/hama/matrix/AbstractMatrix.java (리비전 942654)
+++ src/java/org/apache/hama/matrix/AbstractMatrix.java (작업 사본)
@@ -27,8 +27,6 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
@@ -40,20 +38,15 @@
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hama.Constants;
import org.apache.hama.HamaAdmin;
import org.apache.hama.HamaAdminImpl;
import org.apache.hama.HamaConfiguration;
-import org.apache.hama.matrix.algebra.MatrixNormMapReduce;
import org.apache.hama.matrix.algebra.TransposeMap;
import org.apache.hama.matrix.algebra.TransposeReduce;
import org.apache.hama.util.BytesUtil;
@@ -167,169 +160,6 @@
return this.table;
}
- protected double getNorm1() throws IOException {
- final FileSystem fs = FileSystem.get(config);
- Path outDir = new Path(new Path(getType() + "_TMP_norm1_dir_"
- + System.currentTimeMillis()), "out");
- if (fs.exists(outDir))
- fs.delete(outDir, true);
-
- Job job = new Job(config, "norm1 MR job : " + this.getPath());
- Scan scan = new Scan();
- scan.addFamily(Constants.COLUMNFAMILY);
-
- TableMapReduceUtil.initTableMapperJob(this.getPath(), scan,
- MatrixNormMapReduce.MatrixOneNormMapper.class, IntWritable.class,
- DoubleWritable.class, job);
-
- job.setCombinerClass(MatrixNormMapReduce.MatrixOneNormCombiner.class);
- job.setReducerClass(MatrixNormMapReduce.MatrixOneNormReducer.class);
- job.setNumReduceTasks(1);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(IntWritable.class);
- job.setOutputValueClass(DoubleWritable.class);
- SequenceFileOutputFormat.setOutputPath(job, outDir);
-
- try {
- job.waitForCompletion(true);
- System.out.println(job.reduceProgress());
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
-
- // read outputs
- double result = readOutput(config, fs, outDir);
- fs.delete(outDir.getParent(), true);
- return result;
- }
-
- protected double getMaxvalue() throws IOException {
- final FileSystem fs = FileSystem.get(config);
- Path outDir = new Path(new Path(getType() + "_TMP_normMaxValue_dir_"
- + System.currentTimeMillis()), "out");
- if (fs.exists(outDir))
- fs.delete(outDir, true);
-
- Job job = new Job(config, "MaxValue Norm MR job : " + this.getPath());
- Scan scan = new Scan();
- scan.addFamily(Constants.COLUMNFAMILY);
-
- TableMapReduceUtil.initTableMapperJob(this.getPath(), scan,
- MatrixNormMapReduce.MatrixMaxValueNormMapper.class, IntWritable.class,
- DoubleWritable.class, job);
-
- job.setCombinerClass(MatrixNormMapReduce.MatrixMaxValueNormReducer.class);
- job.setReducerClass(MatrixNormMapReduce.MatrixMaxValueNormReducer.class);
- job.setNumReduceTasks(1);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(IntWritable.class);
- job.setOutputValueClass(DoubleWritable.class);
- SequenceFileOutputFormat.setOutputPath(job, outDir);
-
- try {
- job.waitForCompletion(true);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
-
- // read outputs
- double result = readOutput(config, fs, outDir);
- fs.delete(outDir.getParent(), true);
- return result;
- }
-
- protected double getInfinity() throws IOException {
- final FileSystem fs = FileSystem.get(config);
- Path outDir = new Path(new Path(getType() + "_TMP_normInifity_dir_"
- + System.currentTimeMillis()), "out");
- if (fs.exists(outDir))
- fs.delete(outDir, true);
-
- Job job = new Job(config, "Infinity Norm MR job : " + this.getPath());
- Scan scan = new Scan();
- scan.addFamily(Constants.COLUMNFAMILY);
-
- TableMapReduceUtil.initTableMapperJob(this.getPath(), scan,
- MatrixNormMapReduce.MatrixInfinityNormMapper.class, IntWritable.class,
- DoubleWritable.class, job);
-
- job.setCombinerClass(MatrixNormMapReduce.MatrixInfinityNormReduce.class);
- job.setReducerClass(MatrixNormMapReduce.MatrixInfinityNormReduce.class);
- job.setNumReduceTasks(1);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(IntWritable.class);
- job.setOutputValueClass(DoubleWritable.class);
- SequenceFileOutputFormat.setOutputPath(job, outDir);
-
- try {
- job.waitForCompletion(true);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
-
- // read outputs
- double result = readOutput(config, fs, outDir);
- fs.delete(outDir.getParent(), true);
- return result;
- }
-
- protected double getFrobenius() throws IOException {
- final FileSystem fs = FileSystem.get(config);
- Path outDir = new Path(new Path(getType() + "_TMP_normFrobenius_dir_"
- + System.currentTimeMillis()), "out");
- if (fs.exists(outDir))
- fs.delete(outDir, true);
-
- Job job = new Job(config, "Frobenius Norm MR job : " + this.getPath());
- Scan scan = new Scan();
- scan.addFamily(Constants.COLUMNFAMILY);
-
- TableMapReduceUtil.initTableMapperJob(this.getPath(), scan,
- MatrixNormMapReduce.MatrixFrobeniusNormMapper.class, IntWritable.class,
- DoubleWritable.class, job);
-
- job.setCombinerClass(MatrixNormMapReduce.MatrixFrobeniusNormCombiner.class);
- job.setReducerClass(MatrixNormMapReduce.MatrixFrobeniusNormReducer.class);
- job.setNumReduceTasks(1);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(IntWritable.class);
- job.setOutputValueClass(DoubleWritable.class);
- SequenceFileOutputFormat.setOutputPath(job, outDir);
-
- try {
- job.waitForCompletion(true);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
-
- // read outputs
- double result = readOutput(config, fs, outDir);
- fs.delete(outDir.getParent(), true);
- return result;
- }
-
- private double readOutput(HamaConfiguration config, FileSystem fs, Path outDir)
- throws IOException {
- Path inFile = new Path(outDir, "part-r-00000");
- IntWritable numInside = new IntWritable();
- DoubleWritable result = new DoubleWritable();
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, config);
- try {
- reader.next(numInside, result);
- } finally {
- reader.close();
- }
- return result.get();
- }
-
/** {@inheritDoc} */
public int getRows() throws IOException {
Get get = new Get(Bytes.toBytes(Constants.METADATA));
Index: src/java/org/apache/hama/matrix/DenseMatrix.java
===================================================================
--- src/java/org/apache/hama/matrix/DenseMatrix.java (리비전 942654)
+++ src/java/org/apache/hama/matrix/DenseMatrix.java (작업 사본)
@@ -20,15 +20,11 @@
package org.apache.hama.matrix;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
@@ -38,35 +34,22 @@
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
-import org.apache.hama.io.BlockID;
import org.apache.hama.io.Pair;
-import org.apache.hama.mapreduce.CollectBlocksMapper;
import org.apache.hama.mapreduce.DummyMapper;
import org.apache.hama.mapreduce.PivotInputFormat;
-import org.apache.hama.mapreduce.RandomMatrixMapper;
-import org.apache.hama.mapreduce.RandomMatrixReducer;
import org.apache.hama.mapreduce.RotationInputFormat;
-import org.apache.hama.matrix.algebra.BlockMultMap;
-import org.apache.hama.matrix.algebra.BlockMultReduce;
-import org.apache.hama.matrix.algebra.DenseMatrixVectorMultMap;
-import org.apache.hama.matrix.algebra.DenseMatrixVectorMultReduce;
import org.apache.hama.matrix.algebra.JacobiInitMap;
import org.apache.hama.matrix.algebra.MatrixAdditionMap;
import org.apache.hama.matrix.algebra.MatrixAdditionReduce;
@@ -79,9 +62,6 @@
*/
public class DenseMatrix extends AbstractMatrix implements Matrix {
static private final String TABLE_PREFIX = DenseMatrix.class.getSimpleName();
- static private final Path TMP_DIR = new Path(DenseMatrix.class
- .getSimpleName()
- + "_TMP_dir");
/**
* Construct a raw matrix. Just create a table in HBase.
@@ -228,77 +208,6 @@
}
/**
- * Generate matrix with random elements using Map/Reduce
- *
- * @param conf configuration object
- * @param m the number of rows.
- * @param n the number of columns.
- * @return an m-by-n matrix with uniformly distributed random elements.
- * @throws IOException
- */
- public static DenseMatrix random_mapred(HamaConfiguration conf, int m, int n)
- throws IOException {
- DenseMatrix rand = new DenseMatrix(conf, m, n);
- LOG.info("Create the " + m + " * " + n + " random matrix : "
- + rand.getPath());
-
- Job job = new Job(conf, "random matrix MR job : " + rand.getPath());
- final Path inDir = new Path(TMP_DIR, "in");
- FileInputFormat.setInputPaths(job, inDir);
- job.setMapperClass(RandomMatrixMapper.class);
-
- job.setMapOutputKeyClass(IntWritable.class);
- job.setMapOutputValueClass(MapWritable.class);
-
- job.getConfiguration().setInt("matrix.column", n);
- job.getConfiguration().set("matrix.type", TABLE_PREFIX);
- job.getConfiguration().set("matrix.density", "100");
-
- job.setInputFormatClass(SequenceFileInputFormat.class);
- final FileSystem fs = FileSystem.get(job.getConfiguration());
- int interval = m / conf.getNumMapTasks();
-
- // generate an input file for each map task
- for (int i = 0; i < conf.getNumMapTasks(); ++i) {
- final Path file = new Path(inDir, "part" + i);
- final IntWritable start = new IntWritable(i * interval);
- IntWritable end = null;
- if ((i + 1) != conf.getNumMapTasks()) {
- end = new IntWritable(((i * interval) + interval) - 1);
- } else {
- end = new IntWritable(m - 1);
- }
- final SequenceFile.Writer writer = SequenceFile.createWriter(fs, job
- .getConfiguration(), file, IntWritable.class, IntWritable.class,
- CompressionType.NONE);
- try {
- writer.append(start, end);
- } finally {
- writer.close();
- }
- System.out.println("Wrote input for Map #" + i);
- }
-
- job.setOutputFormatClass(TableOutputFormat.class);
- job.setReducerClass(RandomMatrixReducer.class);
- job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, rand.getPath());
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(Writable.class);
-
- try {
- job.waitForCompletion(true);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- fs.delete(TMP_DIR, true);
- return rand;
- }
-
- /**
* Generate identity matrix
*
* @param conf configuration object
@@ -536,112 +445,6 @@
}
/**
- * C = A*B using iterative method
- *
- * @param B
- * @return C
- * @throws IOException
- */
- public DenseMatrix mult(Matrix B) throws IOException {
- ensureForMultiplication(B);
- int columns = 0;
- if (B.getColumns() == 1 || this.getColumns() == 1)
- columns = 1;
- else
- columns = this.getColumns();
-
- DenseMatrix result = new DenseMatrix(config, this.getRows(), columns);
- List jobId = new ArrayList();
-
- for (int i = 0; i < this.getRows(); i++) {
- Job job = new Job(config, "multiplication MR job : " + result.getPath()
- + " " + i);
-
- Scan scan = new Scan();
- scan.addFamily(Constants.COLUMNFAMILY);
- job.getConfiguration().set(DenseMatrixVectorMultMap.MATRIX_A,
- this.getPath());
- job.getConfiguration().setInt(DenseMatrixVectorMultMap.ITH_ROW, i);
-
- TableMapReduceUtil.initTableMapperJob(B.getPath(), scan,
- DenseMatrixVectorMultMap.class, IntWritable.class, MapWritable.class,
- job);
- TableMapReduceUtil.initTableReducerJob(result.getPath(),
- DenseMatrixVectorMultReduce.class, job);
- try {
- job.waitForCompletion(false);
- jobId.add(job);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
- }
-
- while (checkAllJobs(jobId) == false) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- return result;
- }
-
- /**
- * C = A * B using Blocking algorithm
- *
- * @param B
- * @param blocks the number of blocks
- * @return C
- * @throws IOException
- */
- public DenseMatrix mult(Matrix B, int blocks) throws IOException {
- ensureForMultiplication(B);
-
- String collectionTable = "collect_" + RandomVariable.randMatrixPath();
- HTableDescriptor desc = new HTableDescriptor(collectionTable);
- desc.addFamily(new HColumnDescriptor(Bytes.toBytes(Constants.BLOCK)));
- this.admin.createTable(desc);
- LOG.info("Collect Blocks");
-
- collectBlocksMapRed(this.getPath(), collectionTable, blocks, true);
- collectBlocksMapRed(B.getPath(), collectionTable, blocks, false);
-
- DenseMatrix result = new DenseMatrix(config, this.getRows(), this
- .getColumns());
-
- Job job = new Job(config, "multiplication MR job : " + result.getPath());
-
- Scan scan = new Scan();
- scan.addFamily(Bytes.toBytes(Constants.BLOCK));
-
- TableMapReduceUtil.initTableMapperJob(collectionTable, scan,
- BlockMultMap.class, BlockID.class, BytesWritable.class, job);
- TableMapReduceUtil.initTableReducerJob(result.getPath(),
- BlockMultReduce.class, job);
-
- try {
- job.waitForCompletion(true);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
-
- hamaAdmin.delete(collectionTable);
- return result;
- }
-
- private void ensureForMultiplication(Matrix m) throws IOException {
- if (getColumns() != m.getRows()) {
- throw new IOException("A's columns should equal with B's rows while A*B.");
- }
- }
-
- /**
* C = alpha*A*B + C
*
* @param alpha
@@ -656,24 +459,6 @@
}
/**
- * Computes the given norm of the matrix
- *
- * @param type
- * @return norm of the matrix
- * @throws IOException
- */
- public double norm(Norm type) throws IOException {
- if (type == Norm.One)
- return getNorm1();
- else if (type == Norm.Frobenius)
- return getFrobenius();
- else if (type == Norm.Infinity)
- return getInfinity();
- else
- return getMaxvalue();
- }
-
- /**
* Returns type of matrix
*/
public String getType() {
@@ -719,50 +504,7 @@
return result;
}
- /**
- * Collect Blocks
- *
- * @param path a input path
- * @param collectionTable the collection table
- * @param blockNum the number of blocks
- * @param bool
- * @throws IOException
- */
- public void collectBlocksMapRed(String path, String collectionTable,
- int blockNum, boolean bool) throws IOException {
- double blocks = Math.pow(blockNum, 0.5);
- if (!String.valueOf(blocks).endsWith(".0"))
- throw new IOException("can't divide.");
- int block_size = (int) blocks;
- Job job = new Job(config, "Blocking MR job" + getPath());
-
- Scan scan = new Scan();
- scan.addFamily(Constants.COLUMNFAMILY);
-
- job.getConfiguration().set(CollectBlocksMapper.BLOCK_SIZE,
- String.valueOf(block_size));
- job.getConfiguration().set(CollectBlocksMapper.ROWS,
- String.valueOf(this.getRows()));
- job.getConfiguration().set(CollectBlocksMapper.COLUMNS,
- String.valueOf(this.getColumns()));
- job.getConfiguration().setBoolean(CollectBlocksMapper.MATRIX_POS, bool);
-
- TableMapReduceUtil.initTableMapperJob(path, scan,
- org.apache.hama.mapreduce.CollectBlocksMapper.class, BlockID.class,
- MapWritable.class, job);
- TableMapReduceUtil.initTableReducerJob(collectionTable,
- org.apache.hama.mapreduce.CollectBlocksReducer.class, job);
-
- try {
- job.waitForCompletion(true);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
- }
-
/**
* Compute all the eigen values. Note: all the eigen values are collected in
* the "eival:value" column, and the eigen vector of a specified eigen value
Index: src/java/org/apache/hama/matrix/Matrix.java
===================================================================
--- src/java/org/apache/hama/matrix/Matrix.java (리비전 942654)
+++ src/java/org/apache/hama/matrix/Matrix.java (작업 사본)
@@ -196,15 +196,6 @@
public Matrix add(double alpha, Matrix B) throws IOException;
/**
- * C = A*B
- *
- * @param B
- * @return C
- * @throws IOException
- */
- public Matrix mult(Matrix B) throws IOException;
-
- /**
* C = alpha*A*B + C
*
* @param alpha
@@ -216,15 +207,6 @@
public Matrix multAdd(double alpha, Matrix B, Matrix C) throws IOException;
/**
- * Computes the given norm of the matrix
- *
- * @param type
- * @return norm of the matrix
- * @throws IOException
- */
- public double norm(Norm type) throws IOException;
-
- /**
* Supported matrix-norms.
*/
enum Norm {
Index: src/java/org/apache/hama/matrix/SparseMatrix.java
===================================================================
--- src/java/org/apache/hama/matrix/SparseMatrix.java (리비전 942654)
+++ src/java/org/apache/hama/matrix/SparseMatrix.java (작업 사본)
@@ -25,30 +25,20 @@
import java.util.Map;
import java.util.Random;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
-import org.apache.hama.mapreduce.RandomMatrixMapper;
-import org.apache.hama.mapreduce.RandomMatrixReducer;
import org.apache.hama.matrix.algebra.SparseMatrixVectorMultMap;
import org.apache.hama.matrix.algebra.SparseMatrixVectorMultReduce;
import org.apache.hama.util.BytesUtil;
@@ -56,9 +46,6 @@
public class SparseMatrix extends AbstractMatrix implements Matrix {
static private final String TABLE_PREFIX = SparseMatrix.class.getSimpleName();
- static private final Path TMP_DIR = new Path(SparseMatrix.class
- .getSimpleName()
- + "_TMP_dir");
public SparseMatrix(HamaConfiguration conf, int m, int n) throws IOException {
setConfiguration(conf);
@@ -119,68 +106,6 @@
return rand;
}
- public static SparseMatrix random_mapred(HamaConfiguration conf, int m,
- int n, double percent) throws IOException {
- SparseMatrix rand = new SparseMatrix(conf, m, n);
- LOG.info("Create the " + m + " * " + n + " random matrix : "
- + rand.getPath());
-
- Job job = new Job(conf, "random matrix MR job : " + rand.getPath());
- final Path inDir = new Path(TMP_DIR, "in");
- FileInputFormat.setInputPaths(job, inDir);
- job.setMapperClass(RandomMatrixMapper.class);
-
- job.setMapOutputKeyClass(IntWritable.class);
- job.setMapOutputValueClass(MapWritable.class);
-
- job.getConfiguration().setInt("matrix.column", n);
- job.getConfiguration().set("matrix.type", TABLE_PREFIX);
- job.getConfiguration().set("matrix.density", String.valueOf(percent));
-
- job.setInputFormatClass(SequenceFileInputFormat.class);
- final FileSystem fs = FileSystem.get(job.getConfiguration());
- int interval = m / conf.getNumMapTasks();
-
- // generate an input file for each map task
- for (int i = 0; i < conf.getNumMapTasks(); ++i) {
- final Path file = new Path(inDir, "part" + i);
- final IntWritable start = new IntWritable(i * interval);
- IntWritable end = null;
- if ((i + 1) != conf.getNumMapTasks()) {
- end = new IntWritable(((i * interval) + interval) - 1);
- } else {
- end = new IntWritable(m - 1);
- }
- final SequenceFile.Writer writer = SequenceFile.createWriter(fs, job
- .getConfiguration(), file, IntWritable.class, IntWritable.class,
- CompressionType.NONE);
- try {
- writer.append(start, end);
- } finally {
- writer.close();
- }
- System.out.println("Wrote input for Map #" + i);
- }
-
- job.setOutputFormatClass(TableOutputFormat.class);
- job.setReducerClass(RandomMatrixReducer.class);
- job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, rand.getPath());
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(Writable.class);
-
- try {
- job.waitForCompletion(true);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- fs.delete(TMP_DIR, true);
- return rand;
- }
-
@Override
public Matrix add(Matrix B) throws IOException {
// TODO Auto-generated method stub
@@ -299,24 +224,6 @@
return null;
}
- /**
- * Computes the given norm of the matrix
- *
- * @param type
- * @return norm of the matrix
- * @throws IOException
- */
- public double norm(Norm type) throws IOException {
- if (type == Norm.One)
- return getNorm1();
- else if (type == Norm.Frobenius)
- return getFrobenius();
- else if (type == Norm.Infinity)
- return getInfinity();
- else
- return getMaxvalue();
- }
-
@Override
public void setColumn(int column, Vector vector) throws IOException {
// TODO Auto-generated method stub
Index: src/java/org/apache/hama/matrix/algebra/BlockMultMap.java
===================================================================
--- src/java/org/apache/hama/matrix/algebra/BlockMultMap.java (리비전 942654)
+++ src/java/org/apache/hama/matrix/algebra/BlockMultMap.java (작업 사본)
@@ -1,25 +0,0 @@
-package org.apache.hama.matrix.algebra;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableMapper;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hama.Constants;
-import org.apache.hama.io.BlockID;
-import org.apache.hama.matrix.SubMatrix;
-
-public class BlockMultMap extends TableMapper {
- private byte[] COLUMN = Bytes.toBytes(Constants.BLOCK);
-
- public void map(ImmutableBytesWritable key, Result value, Context context)
- throws IOException, InterruptedException {
- SubMatrix a = new SubMatrix(value.getValue(COLUMN, Bytes.toBytes("a")));
- SubMatrix b = new SubMatrix(value.getValue(COLUMN, Bytes.toBytes("b")));
-
- SubMatrix c = a.mult(b);
- context.write(new BlockID(key.get()), new BytesWritable(c.getBytes()));
- }
-}
Index: src/java/org/apache/hama/matrix/algebra/BlockMultReduce.java
===================================================================
--- src/java/org/apache/hama/matrix/algebra/BlockMultReduce.java (리비전 942654)
+++ src/java/org/apache/hama/matrix/algebra/BlockMultReduce.java (작업 사본)
@@ -1,46 +0,0 @@
-package org.apache.hama.matrix.algebra;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableReducer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hama.Constants;
-import org.apache.hama.io.BlockID;
-import org.apache.hama.matrix.SubMatrix;
-import org.apache.hama.util.BytesUtil;
-
-public class BlockMultReduce extends
- TableReducer {
-
- @Override
- public void reduce(BlockID key, Iterable values,
- Context context) throws IOException, InterruptedException {
- SubMatrix s = null;
- for (BytesWritable value : values) {
- SubMatrix b = new SubMatrix(value.getBytes());
- if (s == null) {
- s = b;
- } else {
- s = s.add(b);
- }
- }
-
- int startRow = key.getRow() * s.getRows();
- int startColumn = key.getColumn() * s.getColumns();
-
- for (int i = 0; i < s.getRows(); i++) {
- Put put = new Put(BytesUtil.getRowIndex(i + startRow));
- for (int j = 0; j < s.getColumns(); j++) {
- put.add(Constants.COLUMNFAMILY, Bytes.toBytes(String.valueOf(j + startColumn)),
- Bytes.toBytes(s.get(i, j)));
- }
-
- context.write(new ImmutableBytesWritable(BytesUtil.getRowIndex(key
- .getRow())), put);
- }
- }
-}
Index: src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultMap.java
===================================================================
--- src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultMap.java (리비전 942654)
+++ src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultMap.java (작업 사본)
@@ -1,73 +0,0 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.matrix.algebra;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableMapper;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.matrix.DenseMatrix;
-import org.apache.hama.matrix.DenseVector;
-import org.apache.hama.util.BytesUtil;
-
-public class DenseMatrixVectorMultMap extends
- TableMapper implements Configurable {
- private Configuration conf = null;
- protected DenseVector currVector;
- public static final String ITH_ROW = "ith.row";
- public static final String MATRIX_A = "hama.multiplication.matrix.a";
- private IntWritable nKey = new IntWritable();
-
- public void map(ImmutableBytesWritable key, Result value, Context context)
- throws IOException, InterruptedException {
- double ithjth = currVector.get(BytesUtil.getRowIndex(key.get()));
- if (ithjth != 0) {
- DenseVector scaled = new DenseVector(value).scale(ithjth);
- context.write(nKey, scaled.getEntries());
- }
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- DenseMatrix matrix_a;
- try {
- matrix_a = new DenseMatrix(new HamaConfiguration(conf), conf.get(MATRIX_A,
- ""));
- int ithRow = conf.getInt(ITH_ROW, 0);
- nKey.set(ithRow);
- currVector = matrix_a.getRow(ithRow);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
-}
Index: src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultReduce.java
===================================================================
--- src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultReduce.java (리비전 942654)
+++ src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultReduce.java (작업 사본)
@@ -1,66 +0,0 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.matrix.algebra;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableReducer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hama.Constants;
-import org.apache.hama.matrix.DenseVector;
-import org.apache.hama.util.BytesUtil;
-
-public class DenseMatrixVectorMultReduce extends
- TableReducer {
-
- @Override
- public void reduce(IntWritable key, Iterable values,
- Context context) throws IOException, InterruptedException {
- DenseVector sum = new DenseVector();
-
- for (MapWritable value : values) {
- DenseVector nVector = new DenseVector(value);
-
- if (sum.size() == 0) {
- sum.zeroFill(nVector.size());
- sum.add(nVector);
- } else {
- sum.add(nVector);
- }
- }
-
- Put put = new Put(BytesUtil.getRowIndex(key.get()));
- for (Map.Entry e : sum.getEntries().entrySet()) {
- put.add(Constants.COLUMNFAMILY, Bytes.toBytes(String
- .valueOf(((IntWritable) e.getKey()).get())), Bytes
- .toBytes(((DoubleWritable) e.getValue()).get()));
- }
-
- context.write(new ImmutableBytesWritable(BytesUtil.getRowIndex(key.get())),
- put);
- }
-}
Index: src/java/org/apache/hama/matrix/algebra/MatrixNormMapReduce.java
===================================================================
--- src/java/org/apache/hama/matrix/algebra/MatrixNormMapReduce.java (리비전 942654)
+++ src/java/org/apache/hama/matrix/algebra/MatrixNormMapReduce.java (작업 사본)
@@ -1,221 +0,0 @@
-package org.apache.hama.matrix.algebra;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.NavigableMap;
-
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableMapper;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hama.Constants;
-import org.apache.hama.util.BytesUtil;
-import org.apache.log4j.Logger;
-
-/** A Catalog class collect all the mr classes to compute the matrix's norm */
-public class MatrixNormMapReduce {
- public final static IntWritable nKey = new IntWritable(-1);
-
- /** Infinity Norm */
- public static class MatrixInfinityNormMapper extends
- TableMapper {
- private DoubleWritable nValue = new DoubleWritable();
-
- @Override
- public void map(ImmutableBytesWritable key, Result value, Context context)
- throws IOException, InterruptedException {
-
- double rowSum = 0;
- NavigableMap v = value
- .getFamilyMap(Constants.COLUMNFAMILY);
- for (Map.Entry e : v.entrySet()) {
- rowSum += Math.abs(Bytes.toDouble(e.getValue()));
- }
-
- nValue.set(rowSum);
- context.write(MatrixNormMapReduce.nKey, nValue);
- }
- }
-
- /**
- * Matrix Infinity Norm Reducer
- */
- public static class MatrixInfinityNormReduce extends
- Reducer {
- static final Logger LOG = Logger.getLogger(MatrixInfinityNormReduce.class);
- private double max = 0;
- private DoubleWritable nValue = new DoubleWritable();
-
- public void reduce(IntWritable key, Iterable values,
- Context context) throws IOException, InterruptedException {
- for (DoubleWritable val : values) {
- max = Math.max(val.get(), max);
- }
-
- nValue.set(max);
- context.write(MatrixNormMapReduce.nKey, nValue);
- }
- }
-
- /** One Norm Mapper */
- public static class MatrixOneNormMapper extends
- TableMapper {
- private IntWritable newkey = new IntWritable();
- private DoubleWritable nValue = new DoubleWritable();
-
- @Override
- public void map(ImmutableBytesWritable key, Result value, Context context)
- throws IOException, InterruptedException {
-
- NavigableMap v = value
- .getFamilyMap(Constants.COLUMNFAMILY);
- for (Map.Entry e : v.entrySet()) {
- newkey.set(BytesUtil.bytesToInt(e.getKey()));
- nValue.set(Bytes.toDouble(e.getValue()));
- context.write(newkey, nValue);
- }
- }
- }
-
- /** One Norm Combiner * */
- public static class MatrixOneNormCombiner extends
- Reducer {
- private DoubleWritable nValue = new DoubleWritable();
-
- @Override
- public void reduce(IntWritable key, Iterable values,
- Context context) throws IOException, InterruptedException {
-
- double partialColSum = 0;
- for (DoubleWritable val : values) {
- partialColSum += val.get();
- }
-
- nValue.set(partialColSum);
- context.write(key, nValue);
- }
- }
-
- /** One Norm Reducer * */
- public static class MatrixOneNormReducer extends
- Reducer {
- private double max = 0;
-
- @Override
- public void reduce(IntWritable key, Iterable values,
- Context context) throws IOException, InterruptedException {
- double colSum = 0;
-
- for (DoubleWritable val : values) {
- colSum += val.get();
- }
-
- max = Math.max(Math.abs(colSum), max);
- }
-
- public void cleanup(Context context) throws IOException,
- InterruptedException {
- context.write(MatrixNormMapReduce.nKey, new DoubleWritable(max));
- }
- }
-
- /** Frobenius Norm Mapper */
- public static class MatrixFrobeniusNormMapper extends
- TableMapper {
- private DoubleWritable nValue = new DoubleWritable();
-
- @Override
- public void map(ImmutableBytesWritable key, Result value, Context context)
- throws IOException, InterruptedException {
- double rowSqrtSum = 0;
-
- NavigableMap v = value
- .getFamilyMap(Constants.COLUMNFAMILY);
- for (Map.Entry e : v.entrySet()) {
- double cellValue = Bytes.toDouble(e.getValue());
- rowSqrtSum += (cellValue * cellValue);
- }
-
- nValue.set(rowSqrtSum);
- context.write(MatrixNormMapReduce.nKey, nValue);
- }
- }
-
- /** Frobenius Norm Combiner */
- public static class MatrixFrobeniusNormCombiner extends
- Reducer {
- private double sqrtSum = 0;
- private DoubleWritable nValue = new DoubleWritable();
-
- @Override
- public void reduce(IntWritable key, Iterable values,
- Context context) throws IOException, InterruptedException {
- for (DoubleWritable val : values) {
- sqrtSum += val.get();
- }
-
- nValue.set(sqrtSum);
- context.write(MatrixNormMapReduce.nKey, nValue);
- }
- }
-
- /** Frobenius Norm Reducer */
- public static class MatrixFrobeniusNormReducer extends
- Reducer {
- private double sqrtSum = 0;
-
- @Override
- public void reduce(IntWritable key, Iterable values,
- Context context) throws IOException, InterruptedException {
- for (DoubleWritable val : values) {
- sqrtSum += val.get();
- }
-
- context.write(MatrixNormMapReduce.nKey, new DoubleWritable(Math
- .sqrt(sqrtSum)));
- }
- }
-
- /** MaxValue Norm Mapper * */
- public static class MatrixMaxValueNormMapper extends
- TableMapper {
- private DoubleWritable nValue = new DoubleWritable();
-
- @Override
- public void map(ImmutableBytesWritable key, Result value, Context context)
- throws IOException, InterruptedException {
- double max = 0;
-
- NavigableMap v = value
- .getFamilyMap(Constants.COLUMNFAMILY);
- for (Map.Entry e : v.entrySet()) {
- double cellValue = Bytes.toDouble(e.getValue());
- max = cellValue > max ? cellValue : max;
- }
-
- nValue.set(max);
- context.write(MatrixNormMapReduce.nKey, nValue);
- }
- }
-
- /** MaxValue Norm Reducer */
- public static class MatrixMaxValueNormReducer extends
- Reducer {
- private double max = 0;
- private DoubleWritable nValue = new DoubleWritable();
-
- @Override
- public void reduce(IntWritable key, Iterable values,
- Context context) throws IOException, InterruptedException {
- for (DoubleWritable val : values) {
- max = Math.max(val.get(), max);
- }
-
- nValue.set(max);
- context.write(MatrixNormMapReduce.nKey, nValue);
- }
- }
-}
Index: src/test/org/apache/hama/Benchmarks.java
===================================================================
--- src/test/org/apache/hama/Benchmarks.java (리비전 942646)
+++ src/test/org/apache/hama/Benchmarks.java (작업 사본)
@@ -1,6 +1,7 @@
package org.apache.hama;
import org.apache.hama.matrix.DenseMatrix;
+import org.apache.hama.examples.*;
public class Benchmarks {
@@ -12,7 +13,7 @@
HamaConfiguration conf = new HamaConfiguration();
System.out.println("Creating random matrix");
- DenseMatrix rand = DenseMatrix.random_mapred(conf, Integer
+ DenseMatrix rand = RandomMatrix.random_mapred(conf, Integer
.parseInt(args[0]), Integer.parseInt(args[0]));
double start = System.currentTimeMillis();
Index: src/test/org/apache/hama/examples/TestFileMatrixBlockMult.java
===================================================================
--- src/test/org/apache/hama/examples/TestFileMatrixBlockMult.java (리비전 942646)
+++ src/test/org/apache/hama/examples/TestFileMatrixBlockMult.java (작업 사본)
@@ -28,12 +28,12 @@
import org.apache.hama.Constants;
import org.apache.hama.HamaCluster;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.examples.mapreduce.BlockMultMap;
+import org.apache.hama.examples.mapreduce.BlockMultReduce;
import org.apache.hama.io.BlockID;
import org.apache.hama.matrix.DenseMatrix;
import org.apache.hama.matrix.DenseVector;
import org.apache.hama.matrix.Matrix;
-import org.apache.hama.matrix.algebra.BlockMultMap;
-import org.apache.hama.matrix.algebra.BlockMultReduce;
import org.apache.hama.util.RandomVariable;
public class TestFileMatrixBlockMult extends HamaCluster {
@@ -162,7 +162,7 @@
job.getConfiguration().setBoolean(MyMapper.MATRIX_POS, b);
TableMapReduceUtil.initTableReducerJob(collectionTable,
- org.apache.hama.mapreduce.CollectBlocksReducer.class, job);
+ org.apache.hama.examples.mapreduce.CollectBlocksReducer.class, job);
try {
job.waitForCompletion(true);
Index: src/test/org/apache/hama/examples/TestMatrixMult.java
===================================================================
--- src/test/org/apache/hama/examples/TestMatrixMult.java (리비전 0)
+++ src/test/org/apache/hama/examples/TestMatrixMult.java (리비전 0)
@@ -0,0 +1,75 @@
+package org.apache.hama.examples;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.matrix.Matrix;
+
+public class TestMatrixMult extends HamaCluster {
+ private int SIZE = 8;
+ private Matrix m1;
+ private Matrix m2;
+ private HamaConfiguration conf;
+
+ /**
+ * @throws UnsupportedEncodingException
+ */
+ public TestMatrixMult() throws UnsupportedEncodingException {
+ super();
+ }
+
+ public void setUp() throws Exception {
+ super.setUp();
+
+ conf = getConf();
+
+ m1 = RandomMatrix.random_mapred(conf, SIZE, SIZE);
+ m2 = RandomMatrix.random_mapred(conf, SIZE, SIZE);
+ }
+
+ /**
+ * Test matrices multiplication
+ *
+ * @throws IOException
+ */
+ public void testMult() throws IOException {
+ Matrix result = MatrixMultiplication.mult(m1, m2);
+
+ assertEquals(result.getRows(), SIZE);
+ assertEquals(result.getColumns(), SIZE);
+
+ Matrix result2 = MatrixMultiplication.mult(m1, m2, 4);
+
+ verifyMultResult(m1, m2, result);
+ verifyMultResult(m1, m2, result2);
+ }
+
+ /**
+ * Verifying multiplication result
+ *
+ * @param m1
+ * @param m2
+ * @param result
+ * @throws IOException
+ */
+ private void verifyMultResult(Matrix m1, Matrix m2, Matrix result)
+ throws IOException {
+ double[][] c = new double[SIZE][SIZE];
+
+ for (int i = 0; i < SIZE; i++) {
+ for (int j = 0; j < SIZE; j++) {
+ for (int k = 0; k < SIZE; k++) {
+ c[i][k] += m1.get(i, j) * m2.get(j, k);
+ }
+ }
+ }
+
+ for (int i = 0; i < SIZE; i++) {
+ for (int j = 0; j < SIZE; j++) {
+ assertTrue((Math.abs(c[i][j] - result.get(i, j)) < .0000001));
+ }
+ }
+ }
+}
Index: src/test/org/apache/hama/mapreduce/TestBlockMatrixMapReduce.java
===================================================================
--- src/test/org/apache/hama/mapreduce/TestBlockMatrixMapReduce.java (리비전 942646)
+++ src/test/org/apache/hama/mapreduce/TestBlockMatrixMapReduce.java (작업 사본)
@@ -24,6 +24,7 @@
import org.apache.hama.HamaCluster;
import org.apache.hama.matrix.DenseMatrix;
import org.apache.log4j.Logger;
+import org.apache.hama.examples.MatrixMultiplication;
public class TestBlockMatrixMapReduce extends HamaCluster {
static final Logger LOG = Logger.getLogger(TestBlockMatrixMapReduce.class);
@@ -39,7 +40,7 @@
DenseMatrix m1 = DenseMatrix.random(conf, SIZE, SIZE);
DenseMatrix m2 = DenseMatrix.random(conf, SIZE, SIZE);
- DenseMatrix c = (DenseMatrix) m1.mult(m2, 16);
+ DenseMatrix c = MatrixMultiplication.mult(m1, m2, 16);
double[][] mem = new double[SIZE][SIZE];
for (int i = 0; i < SIZE; i++) {
Index: src/test/org/apache/hama/mapreduce/TestRandomMatrixMapReduce.java
===================================================================
--- src/test/org/apache/hama/mapreduce/TestRandomMatrixMapReduce.java (리비전 942646)
+++ src/test/org/apache/hama/mapreduce/TestRandomMatrixMapReduce.java (작업 사본)
@@ -25,12 +25,14 @@
import org.apache.hama.matrix.DenseMatrix;
import org.apache.hama.matrix.SparseMatrix;
import org.apache.log4j.Logger;
+import org.apache.hama.examples.mapreduce.*;
+import org.apache.hama.examples.*;
public class TestRandomMatrixMapReduce extends HamaCluster {
static final Logger LOG = Logger.getLogger(TestRandomMatrixMapReduce.class);
public void testRandomMatrixMapReduce() throws IOException {
- DenseMatrix rand = DenseMatrix.random_mapred(conf, 20, 20);
+ DenseMatrix rand = RandomMatrix.random_mapred(conf, 20, 20);
assertEquals(20, rand.getRows());
assertEquals(20, rand.getColumns());
@@ -42,7 +44,7 @@
rand.close();
- SparseMatrix rand2 = SparseMatrix.random_mapred(conf, 20, 20, 30);
+ SparseMatrix rand2 = RandomMatrix.random_mapred(conf, 20, 20, 30);
assertEquals(20, rand2.getRows());
assertEquals(20, rand2.getColumns());
boolean zeroAppear = false;
Index: src/test/org/apache/hama/matrix/TestAbstractMatrix.java
===================================================================
--- src/test/org/apache/hama/matrix/TestAbstractMatrix.java (리비전 942646)
+++ src/test/org/apache/hama/matrix/TestAbstractMatrix.java (작업 사본)
@@ -5,6 +5,7 @@
import org.apache.hama.HamaCluster;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.examples.MatrixNorm;
import org.apache.hama.matrix.Matrix.Norm;
import org.apache.log4j.Logger;
@@ -48,22 +49,22 @@
}
public void normTest(Matrix matrix) throws IOException {
- double norm1 = matrix.norm(Norm.One);
+ double norm1 = MatrixNorm.norm(matrix, Norm.One);
double verify_norm1 = MatrixTestCommon.verifyNorm1(matrix);
gap = norm1 - verify_norm1;
assertTrue(gap < 0.000001 && gap > -0.000001);
- double normInfinity = matrix.norm(Norm.Infinity);
+ double normInfinity = MatrixNorm.norm(matrix, Norm.Infinity);
double verify_normInf = MatrixTestCommon.verifyNormInfinity(matrix);
gap = normInfinity - verify_normInf;
assertTrue(gap < 0.000001 && gap > -0.000001);
- double normFrobenius = matrix.norm(Norm.Frobenius);
+ double normFrobenius = MatrixNorm.norm(matrix, Norm.Frobenius);
double verify_normFrobenius = MatrixTestCommon.verifyNormFrobenius(matrix);
gap = normFrobenius - verify_normFrobenius;
assertTrue(gap < 0.000001 && gap > -0.000001);
- double normMaxValue = matrix.norm(Norm.Maxvalue);
+ double normMaxValue = MatrixNorm.norm(matrix, Norm.Maxvalue);
double verify_normMV = MatrixTestCommon.verifyNormMaxValue(matrix);
gap = normMaxValue - verify_normMV;
assertTrue(gap < 0.000001 && gap > -0.000001);
Index: src/test/org/apache/hama/matrix/TestDenseMatrix.java
===================================================================
--- src/test/org/apache/hama/matrix/TestDenseMatrix.java (리비전 942646)
+++ src/test/org/apache/hama/matrix/TestDenseMatrix.java (작업 사본)
@@ -66,13 +66,6 @@
} catch (IOException e) {
LOG.info(e.toString());
}
-
- try {
- m1.mult(m4);
- fail("Matrix-Mult should be failed while A.columns!=B.rows.");
- } catch (IOException e) {
- LOG.info(e.toString());
- }
double origin = m1.get(1, 1);
m1.add(1, 1, 0.5);
@@ -80,7 +73,6 @@
matrixAdd(m1, m2);
multMatrixAdd(m1, m2, m3);
- matrixMult(m1, m2);
addAlphaMatrix(m1, m2);
getRowColumnVector();
@@ -131,20 +123,6 @@
}
}
- /**
- * Test matrices multiplication
- *
- * @throws IOException
- */
- public void matrixMult(Matrix m1, Matrix m2) throws IOException {
- Matrix result = m1.mult(m2);
-
- assertEquals(result.getRows(), SIZE);
- assertEquals(result.getColumns(), SIZE);
-
- verifyMultResult(m1, m2, result);
- }
-
public void addAlphaMatrix(Matrix m1, Matrix m2) throws IOException {
double value = m1.get(0, 0) + (m2.get(0, 0) * 0.1);
Matrix result = m1.add(0.1, m2);
@@ -227,31 +205,4 @@
x++;
}
}
-
- /**
- * Verifying multiplication result
- *
- * @param m1
- * @param m2
- * @param result
- * @throws IOException
- */
- private void verifyMultResult(Matrix m1, Matrix m2, Matrix result)
- throws IOException {
- double[][] c = new double[SIZE][SIZE];
-
- for (int i = 0; i < SIZE; i++) {
- for (int j = 0; j < SIZE; j++) {
- for (int k = 0; k < SIZE; k++) {
- c[i][k] += m1.get(i, j) * m2.get(j, k);
- }
- }
- }
-
- for (int i = 0; i < SIZE; i++) {
- for (int j = 0; j < SIZE; j++) {
- assertTrue((Math.abs(c[i][j] - result.get(i, j)) < .0000001));
- }
- }
- }
}
Index: src/test/org/apache/hama/matrix/TestMatrixVectorMult.java
===================================================================
--- src/test/org/apache/hama/matrix/TestMatrixVectorMult.java (리비전 942646)
+++ src/test/org/apache/hama/matrix/TestMatrixVectorMult.java (작업 사본)
@@ -25,6 +25,7 @@
import org.apache.hama.HamaCluster;
import org.apache.hama.HamaConfiguration;
import org.apache.log4j.Logger;
+import org.apache.hama.examples.MatrixMultiplication;
public class TestMatrixVectorMult extends HamaCluster {
static final Logger LOG = Logger.getLogger(TestMatrixVectorMult.class);
@@ -57,7 +58,7 @@
}
public void testMatVectorMult() throws IOException {
- DenseMatrix c = (DenseMatrix) m1.mult(m2);
+ DenseMatrix c = MatrixMultiplication.mult(m1, m2);
assertTrue(m1.getRows() == 2);
for (int i = 0; i < c.getRows(); i++) {
Index: src/test/org/apache/hama/matrix/TestSingularValueDecomposition.java
===================================================================
--- src/test/org/apache/hama/matrix/TestSingularValueDecomposition.java (리비전 942646)
+++ src/test/org/apache/hama/matrix/TestSingularValueDecomposition.java (작업 사본)
@@ -29,6 +29,7 @@
import org.apache.hama.Constants;
import org.apache.hama.HamaCluster;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.examples.MatrixMultiplication;
import org.apache.hama.util.BytesUtil;
import org.apache.log4j.Logger;
@@ -66,7 +67,7 @@
public void testEigenSingularValues() throws IOException {
Matrix aT = m1.transpose();
- DenseMatrix aTa = (DenseMatrix) aT.mult(m1);
+ DenseMatrix aTa = MatrixMultiplication.mult(aT, m1);
for (int i = 0; i < m1.getRows(); i++) {
for (int j = 0; j < m1.getRows(); j++) {