Index: examples/src/main/java/org/apache/hama/examples/RandomMatrixGenerator.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/RandomMatrixGenerator.java (revision 0) +++ examples/src/main/java/org/apache/hama/examples/RandomMatrixGenerator.java (working copy) @@ -0,0 +1,410 @@ +package org.apache.hama.examples; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.BSP; +import org.apache.hama.bsp.BSPJob; +import org.apache.hama.bsp.BSPJobClient; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.ClusterStatus; +import org.apache.hama.bsp.FileOutputFormat; +import org.apache.hama.bsp.NullInputFormat; +import org.apache.hama.bsp.SequenceFileOutputFormat; +import org.apache.hama.bsp.sync.SyncException; +import org.apache.hama.examples.util.SparseVectorWritable; + +/** + * This class can generate random matrix. It uses {@link MyGenerator}. You can + * specify different options in command line. {@link parseArgs} for more info. + * Option for symmetric matrices is not supported yet. Currently it implements + * row-wise logic, which is usable for {@link SpMV} + */ +public class RandomMatrixGenerator { + private static Path TMP_OUTPUT = new Path("/tmp/matrix-gen-" + + System.currentTimeMillis()); + + private static HamaConfiguration conf; + + public static String requestedBspTasksString = "bsptask.count"; + + public static String sparsityString = "randomgenerator.sparsity"; + + public static String rowsString = "randomgenerator.rows"; + + public static String columnsString = "randomgenerator.columns"; + + public static String outputString = "randomgenerator.output"; + + /* + * IMPORTANT NOTE: I tried to use enums for counters, but I couldn't access + * them after bsp computations. So I preferred static counter. It sould be + * checked. + */ + private static Counter totalCounter; + + public static boolean configurationNull() { + return conf == null; + } + + public static void setConfiguration(HamaConfiguration configuration) { + conf = configuration; + } + + public static int getRequestedBspTasksCount() { + return conf.getInt(requestedBspTasksString, -1); + } + + public static void setRequestedBspTasksCount(int requestedBspTasksCount) { + conf.setInt(requestedBspTasksString, requestedBspTasksCount); + } + + public static float getSparsity() { + return conf.getFloat(sparsityString, 0.1f); + } + + public static void setSparsity(float sparsity) { + conf.setFloat(sparsityString, sparsity); + } + + public static int getRows() { + return conf.getInt(rowsString, 10); + } + + public static void setRows(int rows) { + conf.setInt(rowsString, rows); + } + + public static int getColumns() { + return conf.getInt(columnsString, 10); + } + + public static void setColumns(int columns) { + conf.setInt(columnsString, columns); + } + + public static String getOutputPath() { + return conf.get(outputString); + } + + public static void setOutputPath(String outputPath) { + conf.set(outputString, outputPath); + } + + public static int getGeneratedCount() { + return (int) totalCounter.getValue(); + } + + /** + * Now function parses unix-like command line. Format: -option=value Options: + * -o - Output path for generator -r - Rows count -c - Columns count -s - + * sparsity -n - requested bsp task number Example: Assign 5x5 matrix with 0.2 + * sparsity and request 5 bsp tasks. -r=5 -c=5 -s=0.2 -n=5 + **/ + public static void parseArgs(String[] args) { + try { + String[] arr; + String option, value; + for (String arg : args) { + arr = arg.split("="); + try { + option = arr[0]; + value = arr[1]; + } catch (IndexOutOfBoundsException e) { + throw new IllegalArgumentException( + "Mallformed option. Usage: -option=value. Current value: " + arg); + } + + if (option.equals("-o")) { + RandomMatrixGenerator.setOutputPath(value); + continue; + } + + if (option.equals("-n")) { + try { + int requestedBspTasksCount = Integer.parseInt(value); + if (requestedBspTasksCount < 0) + throw new IllegalArgumentException( + "The number of requested bsp tasks can't be negative. Actual value: " + + String.valueOf(requestedBspTasksCount)); + RandomMatrixGenerator + .setRequestedBspTasksCount(requestedBspTasksCount); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "The format of requested bsp tasks is int. Can not parse value: " + + value); + } + continue; + } + + if (option.equals("-r")) { + try { + int rows = Integer.parseInt(value); + if (rows < 0) + throw new IllegalArgumentException( + "The number of matrix rows can't be negative. Actual value: " + + String.valueOf(rows)); + RandomMatrixGenerator.setRows(rows); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "The format of matrix rows is int. Can not parse value: " + + value); + } + continue; + } + + if (option.equals("-c")) { + try { + int columns = Integer.parseInt(value); + if (columns < 0) + throw new IllegalArgumentException( + "The number of matrix columns can't be negative. Actual value: " + + String.valueOf(columns)); + RandomMatrixGenerator.setColumns(columns); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "The format of matrix columns is int. Can not parse value: " + + value); + } + continue; + } + + if (option.equals("-s")) { + try { + float sparsity = Float.parseFloat(value); + if (sparsity < 0.0 || sparsity > 1.0) + throw new IllegalArgumentException( + "Sparsity must be between 0.0 and 1.0. Actual value: " + + String.valueOf(sparsity)); + RandomMatrixGenerator.setSparsity(sparsity); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "The format of sparsity is float. Can not parse value: " + + value); + } + continue; + } + + throw new IllegalArgumentException("Unknown option: " + option + value); + + } + } catch (Exception e) { + StringBuilder st = new StringBuilder(); + for (String arg : args) + st.append(" " + arg); + throw new IllegalArgumentException( + "Unexpected error in command line. cmd: " + st.toString() + + ". Message: " + e.getMessage()); + } + } + + /** + * Main function which gives an ability to start generator with command line. + * + * @param args + * command line + */ + public static void main(String[] args) throws IOException, + InterruptedException, ClassNotFoundException { + // BSP job configuration + HamaConfiguration conf = new HamaConfiguration(); + if (RandomMatrixGenerator.configurationNull()) + RandomMatrixGenerator.setConfiguration(conf); + parseArgs(args); + startTask(); + } + + /** + * Alternative way to start generator. requestedBstTasksCount and outputPath + * parameters are optional. + */ + public static void main(int rows, int columns, float sparsity, + Integer requestedBspTasksCount, String outputPath) throws IOException, + InterruptedException, ClassNotFoundException { + + HamaConfiguration conf = new HamaConfiguration(); + if (RandomMatrixGenerator.configurationNull()) + RandomMatrixGenerator.setConfiguration(conf); + + if (rows < 0) + throw new IllegalArgumentException( + "The number of matrix rows can't be negative. Actual value: " + + String.valueOf(rows)); + + if (columns < 0) + throw new IllegalArgumentException( + "The number of matrix columns can't be negative. Actual value: " + + String.valueOf(columns)); + + if (sparsity < 0.0 || sparsity > 1.0) + throw new IllegalArgumentException( + "Sparsity must be between 0.0 and 1.0. Actual value: " + + String.valueOf(sparsity)); + + if (requestedBspTasksCount != null && requestedBspTasksCount < 0) + throw new IllegalArgumentException( + "The number of requested bsp tasks can't be negative. Actual value: " + + String.valueOf(requestedBspTasksCount)); + + setRows(rows); + setColumns(columns); + setSparsity(sparsity); + if (requestedBspTasksCount != null) + setRequestedBspTasksCount(requestedBspTasksCount); + if (outputPath != null) + setOutputPath(outputPath); + + startTask(); + } + + /** + * Method which actually starts generator. + */ + private static void startTask() throws IOException, InterruptedException, + ClassNotFoundException { + totalCounter = new Counter() { + + }; + BSPJob bsp = new BSPJob(conf, RandomMatrixGenerator.class); + bsp.setJobName("Random Matrix Generator"); + /* + * Generator doesn't reads input. the output will be presented as matrix + * rows with row index key. TextOutputFormat is for readability it will be + * replaces by SequenceFileOutputFormat. + */ + bsp.setBspClass(MyGenerator.class); + bsp.setInputFormat(NullInputFormat.class); + bsp.setOutputFormat(SequenceFileOutputFormat.class); + bsp.setOutputKeyClass(IntWritable.class); + bsp.setOutputValueClass(SparseVectorWritable.class); + String pathString = getOutputPath(); + Path path = TMP_OUTPUT; + if (pathString != null) + path = new Path(pathString); + else + setOutputPath(TMP_OUTPUT.toString()); + FileOutputFormat.setOutputPath(bsp, path); + + BSPJobClient jobClient = new BSPJobClient(conf); + ClusterStatus cluster = jobClient.getClusterStatus(true); + + if (RandomMatrixGenerator.getRequestedBspTasksCount() != -1) { + bsp.setNumBspTask(RandomMatrixGenerator.getRequestedBspTasksCount()); + } else { + bsp.setNumBspTask(cluster.getMaxTasks()); + } + + long startTime = System.currentTimeMillis(); + if (bsp.waitForCompletion(true)) { + System.out.println("Job Finished in " + + (double) (System.currentTimeMillis() - startTime) / 1000.0 + + " seconds. Output is in " + getOutputPath().toString()); + } + + } + + /** + * Class which currently implements row-wise logic. In case of sparsity > 0.5 + * can get not exact number of generated items, as expected. But it was made + * for efficiency. + */ + public static class MyGenerator + extends + BSP { + public static final Log LOG = LogFactory.getLog(MyGenerator.class); + + private static int rows, columns; + private static float sparsity; + private static int remainder, quotient, needed; + private static int peerCount = 1; + private static Random rand; + private static double criticalSparsity = 0.5; + + @Override + public void setup( + BSPPeer peer) + throws IOException { + sparsity = getSparsity(); + rows = getRows(); + columns = getColumns(); + int total = rows * columns; + peerCount = peer.getNumPeers(); + rand = new Random(); + needed = (int) (total * sparsity); + remainder = needed % rows; + quotient = needed / rows; + } + + /** + * The algorithm is as follows: each peer counts, how many items it needs to + * generate, after that it uses algorithms for sparse matrices if sparsity < + * 0.5 and algorithm for dense matrices otherwise. NOTE: in case of sparsity + * > 0.5 number of generated items can differ from expected count. + */ + @Override + public void bsp( + BSPPeer peer) + throws IOException, SyncException, InterruptedException { + + List peerNamesList = Arrays.asList(peer.getAllPeerNames()); + int peerIndex = peerNamesList.indexOf(peer.getPeerName()); + + HashSet createdIndeces = new HashSet(); + List rowIndeces = new ArrayList(); + int tmpIndex = peerIndex; + while (tmpIndex < rows) { + rowIndeces.add(tmpIndex); + tmpIndex += peerCount; + } + + for (int rowIndex : rowIndeces) { + SparseVectorWritable row = new SparseVectorWritable(); + row.setSize(columns); + createdIndeces.clear(); + int needsToGenerate = quotient; + if (rowIndex < remainder) + needsToGenerate++; + if (sparsity < criticalSparsity) { + // algorithm for sparse matrices. + while (createdIndeces.size() < needsToGenerate) { + int index = (int) (rand.nextDouble() * columns); + if (!createdIndeces.contains(index)) { + totalCounter.increment(1L); + double value = rand.nextDouble(); + row.addCell(index, value); + createdIndeces.add(index); + } + } + } else { + // algorithm for dense matrices + for (int i = 0; i < columns; i++) + if (rand.nextDouble() < sparsity) { + totalCounter.increment(1L); + double value = rand.nextDouble(); + row.addCell(i, value); + } + } + /* + * IMPORTANT: Maybe some optimization can be performed here in case of + * very sparse matrices with empty rows. But I am confused: how to store + * number of non-zero rows with saving partitioning by rows in {@link SpMV}. + */ + // if (row.getSize() > 0) + peer.write(new IntWritable(rowIndex), row); + } + } + } + +} \ No newline at end of file Index: examples/src/main/java/org/apache/hama/examples/SpMV.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/SpMV.java (revision 0) +++ examples/src/main/java/org/apache/hama/examples/SpMV.java (working copy) @@ -0,0 +1,313 @@ +package org.apache.hama.examples; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.BSP; +import org.apache.hama.bsp.BSPJob; +import org.apache.hama.bsp.BSPJobClient; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.ClusterStatus; +import org.apache.hama.bsp.Counters.Counter; +import org.apache.hama.bsp.FileOutputFormat; +import org.apache.hama.bsp.SequenceFileInputFormat; +import org.apache.hama.bsp.SequenceFileOutputFormat; +import org.apache.hama.bsp.sync.SyncException; +import org.apache.hama.examples.util.DenseVectorWritable; +import org.apache.hama.examples.util.SparseVectorWritable; +import org.apache.hama.examples.util.WritableUtil; +import org.apache.hama.util.KeyValuePair; + +/** + * Sparse matrix vector multiplication. Currently it uses row-wise access. + * Assumptions: 1) each peer should have copy of input vector for efficient + * operations. 2) row-wise implementation is good because we don't need to care + * about communication 3) the main way to improve performance - create custom + * Partitioner + */ +public class SpMV { + + private static HamaConfiguration conf; + private static final String outputPathString = "spmv.outputpath"; + private static final String resultPathString = "spmv.resultpath"; + private static final String inputMatrixPathString = "spmv.inputmatrixpath"; + private static final String inputVectorPathString = "spmv.inputvectorpath"; + private static String requestedBspTasksString = "bsptask.count"; + private static final String spmvSuffix = "/spmv/"; + private static final String intermediate = "/part"; + + private static Counter rowCounter; + + public static HamaConfiguration getConf() { + if (conf == null) + conf = new HamaConfiguration(); + return conf; + } + + public static void setConfiguration(HamaConfiguration configuration) { + conf = configuration; + } + + public static String getOutputPath() { + return getConf().get(outputPathString, null); + } + + public static void setOutputPath(String outputPath) { + Path path = new Path(outputPath); + path = path.suffix(intermediate); + getConf().set(outputPathString, path.toString()); + } + + public static String getResultPath() { + return getConf().get(resultPathString, null); + } + + private static void setResultPath(String resultPath) { + getConf().set(resultPathString, resultPath); + } + + public static String getInputMatrixPath() { + return getConf().get(inputMatrixPathString, null); + } + + public static void setInputMatrixPath(String inputPath) { + getConf().set(inputMatrixPathString, inputPath); + } + + public static String getInputVectorPath() { + return getConf().get(inputVectorPathString, null); + } + + public static void setInputVectorPath(String inputPath) { + getConf().set(inputVectorPathString, inputPath); + } + + public static int getRequestedBspTasksCount() { + return getConf().getInt(requestedBspTasksString, -1); + } + + public static void setRequestedBspTasksCount(int requestedBspTasksCount) { + getConf().setInt(requestedBspTasksString, requestedBspTasksCount); + } + + private static String generateOutPath() { + HamaConfiguration conf = SpMV.conf; + if (conf == null) + conf = new HamaConfiguration(); + String prefix = conf.get("hadoop.tmp.dir", "/tmp"); + String pathString = prefix + spmvSuffix + System.currentTimeMillis(); + return pathString; + } + + /** + * Function parses Unix-like command line which consists of -option=value + * pairs. Possible options: -im : path for input file matrix; -iv : path for + * input file vector; -o : optional path for output file for dense vector; -n + * : optional requested number of bsp peers. + */ + private static void parseArgs(String[] args) { + try { + String[] arr; + String option, value; + for (String arg : args) { + arr = arg.split("="); + try { + option = arr[0]; + value = arr[1]; + } catch (IndexOutOfBoundsException e) { + throw new IllegalArgumentException( + "Mallformed option. Usage: -option=value. Current value: " + arg); + } + if (option.equals("-im")) { + SpMV.setInputMatrixPath(value); + continue; + } + + if (option.equals("-iv")) { + SpMV.setInputVectorPath(value); + continue; + } + + if (option.equals("-o")) { + SpMV.setOutputPath(value); + continue; + } + + if (option.equals("-n")) { + try { + int taskCount = Integer.parseInt(value); + if (taskCount < 0) + throw new IllegalArgumentException( + "The number of requested tasks can't be negative. Actual value: " + + String.valueOf(taskCount)); + SpMV.setRequestedBspTasksCount(taskCount); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "The format of requested task count is int. Can not parse value: " + + value); + } + continue; + } + + throw new IllegalArgumentException("Unknown option: " + option + value); + + } + } catch (Exception e) { + StringBuilder st = new StringBuilder(); + for (String arg : args) + st.append(" " + arg); + throw new IllegalArgumentException( + "Unexpected error in command line. cmd: " + st.toString() + + ". Message: " + e.getMessage()); + } + } + + /** + * This method gives opportunity to start SpMV with command line. + */ + public static void main(String[] args) throws IOException, + InterruptedException, ClassNotFoundException { + parseArgs(args); + startTask(); + } + + /** + * Alternative way to start SpMV task. {@code output} and {@code output} and + * {@code requestedBspTaskCount} parameters can be null. {@code output} can be + * generated and {@code requestedBspTaskCount} will be setted to maximum. + */ + public static void main(Path inputMatrix, Path inputVector, Path output, + Integer requestedBspTaskCount) throws IOException, InterruptedException, + ClassNotFoundException { + if (inputMatrix == null) + throw new IllegalArgumentException( + "Input path for SpMV matrix can't be null"); + if (inputVector == null) + throw new IllegalArgumentException( + "Input path for SpMV vector can't be null"); + if (requestedBspTaskCount != null && requestedBspTaskCount < 1) + throw new IllegalArgumentException( + "Number of requested bsp tasks is incorrect. It must be above zero. Actual value is " + + requestedBspTaskCount); + setInputMatrixPath(inputMatrix.toString()); + setInputVectorPath(inputVector.toString()); + setOutputPath(output.toString()); + setRequestedBspTasksCount(requestedBspTaskCount); + startTask(); + } + + /** + * Method which actually starts SpMV. + */ + private static void startTask() throws IOException, InterruptedException, + ClassNotFoundException { + if (getConf() == null) + setConfiguration(new HamaConfiguration()); + rowCounter = new Counter() { + + }; + BSPJob bsp = new BSPJob(conf, SpMV.class); + bsp.setJobName("Sparse matrix vector multiplication"); + bsp.setBspClass(SpMVExecutor.class); + /* + * Input matrix is presented as pairs of integer and {@ link + * SparseVectorWritable}. Output is pairs of integer and double + */ + bsp.setInputFormat(SequenceFileInputFormat.class); + bsp.setOutputKeyClass(IntWritable.class); + bsp.setOutputValueClass(DoubleWritable.class); + bsp.setOutputFormat(SequenceFileOutputFormat.class); + bsp.setInputPath(new Path(getInputMatrixPath())); + + if (getOutputPath() == null) + setOutputPath(generateOutPath()); + // FIXME check this logic. + FileOutputFormat.setOutputPath(bsp, new Path(getOutputPath())); + + BSPJobClient jobClient = new BSPJobClient(conf); + ClusterStatus cluster = jobClient.getClusterStatus(true); + + if (getRequestedBspTasksCount() != -1) { + bsp.setNumBspTask(getRequestedBspTasksCount()); + } else { + bsp.setNumBspTask(cluster.getMaxTasks()); + } + + long startTime = System.currentTimeMillis(); + if (bsp.waitForCompletion(true)) { + System.out.println("Job Finished in " + + (double) (System.currentTimeMillis() - startTime) / 1000.0 + + " seconds."); + convertToDenseVector(); + System.out.println("Result is in " + getResultPath()); + } else { + setResultPath(null); + } + } + + /** + * IMPORTANT: This can be a bottle neck. Problem can be here{@core + * WritableUtil.convertSpMVOutputToDenseVector()} + */ + private static void convertToDenseVector() throws IOException { + WritableUtil util = new WritableUtil(); + int size = (int) rowCounter.getValue(); + String resultPath = util.convertSpMVOutputToDenseVector(getOutputPath(), + getConf(), size); + setResultPath(resultPath); + } + + /** + * This class performs sparse matrix vector multiplication. u = m * v. + */ + private static class SpMVExecutor + extends + BSP { + private DenseVectorWritable v; + + /** + * Each peer reads input dense vector. + */ + @Override + public void setup( + BSPPeer peer) + throws IOException, SyncException, InterruptedException { + // reading input vector, which represented as matrix row + WritableUtil util = new WritableUtil(); + v = new DenseVectorWritable(); + util.readFromFile(getInputVectorPath(), v, conf); + } + + /** + * Local inner product computation and output. + */ + @Override + public void bsp( + BSPPeer peer) + throws IOException, SyncException, InterruptedException { + KeyValuePair row = null; + while ((row = peer.readNext()) != null) { + // it will be needed in conversion of output to result vector + rowCounter.increment(1L); + int key = row.getKey().get(); + int sum = 0; + SparseVectorWritable mRow = row.getValue(); + if (v.getSize() != mRow.getSize()) + throw new RuntimeException("Matrix row with index = " + key + + " is not consistent with input vector. Row size = " + + mRow.getSize() + " vector size = " + v.getSize()); + List mIndeces = mRow.getIndeces(); + List mValues = mRow.getValues(); + for (int i = 0; i < mIndeces.size(); i++) + sum += v.get(mIndeces.get(i)) * mValues.get(i); + peer.write(new IntWritable(key), new DoubleWritable(sum)); + } + peer.sync(); + } + } +} Index: examples/src/main/java/org/apache/hama/examples/util/DenseVectorWritable.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/util/DenseVectorWritable.java (revision 0) +++ examples/src/main/java/org/apache/hama/examples/util/DenseVectorWritable.java (working copy) @@ -0,0 +1,68 @@ +package org.apache.hama.examples.util; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.io.Writable; +import org.apache.hama.examples.SpMV; + +/** + * This class represents dense vector. It will improve memory consumption up to + * two times in comparison to {@link SparseVectorWritable} in case of vectors + * which sparsity is close to 1. Internally represents vector values as array. + * Can be used in {@link SpMV} for representation of input and output vector. + */ +public class DenseVectorWritable implements Writable { + + private double values[]; + + public DenseVectorWritable() { + + } + + public int getSize() { + return values.length; + } + + public void setSize(int size) { + values = new double[size]; + } + + public double get(int index) { + return values[index]; + } + + public void addCell(int index, double value) { + values[index] = value; + } + + @Override + public void readFields(DataInput in) throws IOException { + int size = in.readInt(); + int len = in.readInt(); + setSize(size); + for (int i = 0; i < len; i++) { + int index = in.readInt(); + double value = in.readDouble(); + values[index] = value; + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(getSize()); + out.writeInt(getSize()); + for (int i = 0; i < getSize(); i++) { + out.writeInt(i); + out.writeDouble(values[i]); + } + } + + @Override + public String toString() { + return "values=" + Arrays.toString(values); + } + +} Index: examples/src/main/java/org/apache/hama/examples/util/SparseVectorWritable.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/util/SparseVectorWritable.java (revision 0) +++ examples/src/main/java/org/apache/hama/examples/util/SparseVectorWritable.java (working copy) @@ -0,0 +1,83 @@ +package org.apache.hama.examples.util; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.io.Writable; +import org.apache.hama.examples.SpMV; + +/** + * This class represents sparse vector. It will give improvement in memory + * consumption in case of vectors which sparsity is close to zero. Can be used + * in {@link SpMV} for representing input matrix rows efficiently. Internally + * represents values as list of indeces and list of values. + */ +public class SparseVectorWritable implements Writable { + + private Integer size; + private List indeces; + private List values; + + public SparseVectorWritable() { + indeces = new ArrayList(); + values = new ArrayList(); + } + + public void addCell(int index, double value) { + indeces.add(index); + values.add(value); + } + + public void setSize(int size) { + this.size = size; + } + + public int getSize() { + if (size != null) + return size; + return indeces.size(); + } + + public List getIndeces() { + return indeces; + } + + public List getValues() { + return values; + } + + @Override + public void readFields(DataInput in) throws IOException { + int size = in.readInt(); + int len = in.readInt(); + setSize(size); + for (int i = 0; i < len; i++) { + int index = in.readInt(); + double value = in.readDouble(); + this.addCell(index, value); + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(getSize()); + out.writeInt(indeces.size()); + for (int i = 0; i < indeces.size(); i++) { + out.writeInt(indeces.get(i)); + out.writeDouble(values.get(i)); + } + } + + + @Override + public String toString() { + StringBuilder st = new StringBuilder(); + for (int i = 0; i < indeces.size(); i++) + st.append("(" + indeces.get(i) + " " + values.get(i) + ") "); + return st.toString(); + } + +} Index: examples/src/main/java/org/apache/hama/examples/util/WritableUtil.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/util/WritableUtil.java (revision 0) +++ examples/src/main/java/org/apache/hama/examples/util/WritableUtil.java (working copy) @@ -0,0 +1,162 @@ +package org.apache.hama.examples.util; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hama.examples.SpMV; +import org.apache.hama.examples.SpMVTest; + +/** + * This class is supposed to hide some operations for converting matrices and + * vectors from file format to in-memory format. Also contains method for + * converting output of SpMV task to {@link DenseVectorWritable} Most of methods + * are only needed for test purposes. + */ +public class WritableUtil { + + /** + * This method gives the ability to write matrix from memory to file. It + * should be used with small matrices and for test purposes only. + * + * @param pathString + * path to file where matrix will be writed. + * @param conf + * configuration + * @param matrix + * map of row indeces and values presented as {@link Writable} + * @throws IOException + */ + public void writeMatrix(String pathString, Configuration conf, + Map matrix) throws IOException { + boolean inited = false; + FileSystem fs = FileSystem.get(conf); + SequenceFile.Writer writer = null; + try { + for (Integer index : matrix.keySet()) { + IntWritable key = new IntWritable(index); + Writable value = matrix.get(index); + if (!inited) { + writer = new SequenceFile.Writer(fs, conf, new Path(pathString), + IntWritable.class, value.getClass()); + inited = true; + } + writer.append(key, value); + } + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + if (writer != null) + writer.close(); + } + + } + + /** + * This method is used to read vector from specified path in {@link SpMVTest}. + * For test purposes only. + * + * @param pathString + * input path for vector + * @param result + * instanse of vector writable which should be filled. + * @param conf + * configuration + * @throws IOException + */ + public void readFromFile(String pathString, Writable result, + Configuration conf) throws IOException { + FileSystem fs = FileSystem.get(conf); + SequenceFile.Reader reader = null; + try { + reader = new SequenceFile.Reader(fs, new Path(pathString), conf); + IntWritable key = new IntWritable(); + reader.next(key, result); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + if (reader != null) + reader.close(); + } + } + + /** + * This method is used to write vector from memory to specified path. + * + * @param pathString + * output path + * @param result + * instance of vector to be writed + * @param conf + * configuration + * @throws IOException + */ + public void writeToFile(String pathString, Writable result, Configuration conf) + throws IOException { + FileSystem fs = FileSystem.get(conf); + SequenceFile.Writer writer = null; + try { + writer = new SequenceFile.Writer(fs, conf, new Path(pathString), + IntWritable.class, result.getClass()); + IntWritable key = new IntWritable(); + writer.append(key, result); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + if (writer != null) + writer.close(); + } + } + + /** + * SpMV produces a file, which contains result dense vector in format of pairs + * of integer and double. The aim of this method is to convert SpMV output to + * format usable in subsequent computation - dense vector. It can be usable + * for iterative solvers. IMPORTANT: currently it is used in {@link SpMV}. It + * can be a bottle neck, because all input needs to be stored in memory. + * + * @param SpMVoutputPathString + * output path, which represents directory with part files. + * @param conf + * configuration + * @param size + * size of generated result vector. retrieved from counter. + * @return path to output vector. + * @throws IOException + */ + public String convertSpMVOutputToDenseVector(String SpMVoutputPathString, + Configuration conf, int size) throws IOException { + DenseVectorWritable result = new DenseVectorWritable(); + result.setSize(size); + FileSystem fs = FileSystem.get(conf); + Path SpMVOutputPath = new Path(SpMVoutputPathString); + Path resultOutputPath = SpMVOutputPath.getParent().suffix("/result"); + FileStatus[] stats = fs.listStatus(SpMVOutputPath); + for (FileStatus stat : stats) { + String filePath = stat.getPath().toUri().getPath(); + SequenceFile.Reader reader = null; + fs.open(new Path(filePath)); + try { + reader = new SequenceFile.Reader(fs, new Path(filePath), conf); + IntWritable key = new IntWritable(); + DoubleWritable value = new DoubleWritable(); + while (reader.next(key, value)) + result.addCell(key.get(), value.get()); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + if (reader != null) + reader.close(); + } + } + writeToFile(resultOutputPath.toString(), result, conf); + return resultOutputPath.toString(); + } +} Index: examples/src/test/java/org/apache/hama/examples/RandomMatrixGeneratorTest.java =================================================================== --- examples/src/test/java/org/apache/hama/examples/RandomMatrixGeneratorTest.java (revision 0) +++ examples/src/test/java/org/apache/hama/examples/RandomMatrixGeneratorTest.java (working copy) @@ -0,0 +1,141 @@ +package org.apache.hama.examples; + +import static org.junit.Assert.fail; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.examples.RandomMatrixGenerator; +import org.apache.hama.examples.util.SparseVectorWritable; +import org.junit.Test; + +public class RandomMatrixGeneratorTest { + + private class MatrixReader { + private String pathString; + + public MatrixReader(String pathString) { + this.pathString = pathString; + } + + public void read() throws IOException { + HamaConfiguration conf = new HamaConfiguration(); + Path dir = new Path(pathString); + FileSystem fs = FileSystem.get(conf); + FileStatus[] stats = fs.listStatus(dir); + for (FileStatus stat : stats) { + String filePath = stat.getPath().toUri().getPath(); // gives directory + // name + SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path( + filePath), conf); + IntWritable key = new IntWritable(); + SparseVectorWritable value = new SparseVectorWritable(); + while (reader.next(key, value)) { + System.out.print(key.toString()); + System.out.println(value.toString()); + } + reader.close(); + } + + } + } + + // @Test + public void testRandomMatrixGeneratorEmptyArgs() { + try { + RandomMatrixGenerator.setConfiguration(null); + RandomMatrixGenerator.main(new String[0]); + } catch (Exception e) { + fail(e.getLocalizedMessage()); + } + } + + // @Test + public void testRandomMatrixGeneratorIncorrectArgs() { + try { + RandomMatrixGenerator.setConfiguration(null); + RandomMatrixGenerator.main(new String[] { "-c=200", "-r=200", "-foo=bar", + "-s=0.1" }); + fail("Matrix generator should fail because of invalid arguments."); + } catch (Exception e) { + // everything ok + } + } + + // @Test + public void testRandomMatrixGeneratorIncorrectArgs1() { + try { + RandomMatrixGenerator.setConfiguration(null); + RandomMatrixGenerator.main(new String[] { "-c=-200", "-r=200" }); + fail("Matrix generator should fail because of invalid arguments."); + } catch (Exception e) { + // everything ok + } + } + + // @Test + public void testRandomMatrixGeneratorIncorrectArgs2() { + try { + RandomMatrixGenerator.setConfiguration(null); + RandomMatrixGenerator.main(new String[] { "-c=200", "-r=200", "-s=#" }); + fail("Matrix generator should fail because of invalid arguments."); + } catch (Exception e) { + // everything ok + } + } + + @Test + public void testRandomMatrixGeneratorSmallSparse() { + try { + RandomMatrixGenerator.setConfiguration(null); + RandomMatrixGenerator.main(new String[] { "-c=5", "-r=5", "-s=0.3", + "-n=4" }); + System.out.println("Generated count = " + + RandomMatrixGenerator.getGeneratedCount()); + String outputPath = RandomMatrixGenerator.getOutputPath(); + MatrixReader reader = new MatrixReader(outputPath); + reader.read(); + } catch (Exception e) { + fail(e.getLocalizedMessage()); + } + } + + // @Test + public void testRandomMatrixGeneratorLargeSparse() { + try { + RandomMatrixGenerator.setConfiguration(null); + RandomMatrixGenerator.main(new String[] { "-c=10000", "-r=10000", + "-s=0.1", "-n=4" }); + } catch (Exception e) { + fail(e.getLocalizedMessage()); + } + } + + // @Test + public void testRandomMatrixGeneratorSmallDense() { + try { + RandomMatrixGenerator.setConfiguration(null); + RandomMatrixGenerator.main(new String[] { "-c=4", "-r=4", "-s=0.8", + "-n=4" }); + } catch (Exception e) { + fail(e.getLocalizedMessage()); + } + } + + // @Test + public void testRandomMatrixGeneratorLargeDense() { + try { + RandomMatrixGenerator.setConfiguration(null); + RandomMatrixGenerator.main(new String[] { "-c=200", "-r=200", "-s=0.8", + "-n=4" }); + } catch (Exception e) { + fail(e.getLocalizedMessage()); + } + } + +} Index: examples/src/test/java/org/apache/hama/examples/SpMVTest.java =================================================================== --- examples/src/test/java/org/apache/hama/examples/SpMVTest.java (revision 0) +++ examples/src/test/java/org/apache/hama/examples/SpMVTest.java (working copy) @@ -0,0 +1,112 @@ +package org.apache.hama.examples; + +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.HashMap; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Writable; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.examples.RandomMatrixGenerator; +import org.apache.hama.examples.SpMV; +import org.apache.hama.examples.util.DenseVectorWritable; +import org.apache.hama.examples.util.SparseVectorWritable; +import org.apache.hama.examples.util.WritableUtil; +import org.junit.Before; +import org.junit.Test; + +/** + * This class is test cases for {@link SpMV}. It will contain simple hand + * calculated cases, and cases of different matrix and vector sizes given with + * help of {@link RandomMatrixGenerator} + */ +public class SpMVTest { + private HamaConfiguration conf; + private FileSystem fs; + private String baseDir; + + @Before + public void prepare() throws IOException { + conf = new HamaConfiguration(); + fs = FileSystem.get(conf); + baseDir = fs.getHomeDirectory().toString() + "/spmv"; + } + + /** + * Simple test. multiplying + * [1 0 6 0] [2] [38] + * [0 4 0 0] * [3] = [12] + * [0 2 3 0] [6] [24] + * [3 0 0 5] [1] [11] + */ + @Test + public void simpleSpMVTest() { + try { + HamaConfiguration conf = new HamaConfiguration(); + WritableUtil writableUtil = new WritableUtil(); + String testDir = "/simple/"; + int size = 4; + + // creating test matrix + HashMap inputMatrix = new HashMap(); + SparseVectorWritable vector0 = new SparseVectorWritable(); + vector0.setSize(size); + vector0.addCell(0, 1); + vector0.addCell(2, 6); + SparseVectorWritable vector1 = new SparseVectorWritable(); + vector1.setSize(size); + vector1.addCell(1, 4); + SparseVectorWritable vector2 = new SparseVectorWritable(); + vector2.setSize(size); + vector2.addCell(1, 2); + vector2.addCell(2, 3); + SparseVectorWritable vector3 = new SparseVectorWritable(); + vector3.setSize(size); + vector3.addCell(0, 3); + vector3.addCell(3, 5); + inputMatrix.put(0, vector0); + inputMatrix.put(1, vector1); + inputMatrix.put(2, vector2); + inputMatrix.put(3, vector3); + String matrixPath = baseDir + testDir + "inputMatrix"; + writableUtil.writeMatrix(matrixPath, conf, inputMatrix); + + HashMap inputVector = new HashMap(); + DenseVectorWritable vector = new DenseVectorWritable(); + vector.setSize(size); + vector.addCell(0, 2); + vector.addCell(1, 3); + vector.addCell(2, 6); + vector.addCell(3, 1); + inputVector.put(0, vector); + String vectorPath = baseDir + testDir + "inputVector"; + writableUtil.writeMatrix(vectorPath, conf, inputVector); + + String outputPath = baseDir + testDir; + SpMV.setRequestedBspTasksCount(4); + SpMV.setOutputPath(outputPath); + SpMV.setInputMatrixPath(matrixPath); + SpMV.setInputVectorPath(vectorPath); + SpMV.main(new String[0]); + + String resultPath = SpMV.getResultPath(); + DenseVectorWritable result = new DenseVectorWritable(); + writableUtil.readFromFile(resultPath, result, conf); + + double expected[] = { 38, 12, 24, 11 }; + if (result.getSize() != size) + throw new Exception("Incorrect size of output vector"); + for (int i = 0; i < result.getSize(); i++) + if ((result.get(i) - expected[i]) < 0.01) + expected[i] = 0; + + for (int i = 0; i < expected.length; i++) + if (expected[i] != 0) + throw new Exception("Result doesn't meets expectations"); + + } catch (Exception e) { + fail(e.getLocalizedMessage()); + } + } +}