Index: src/java/org/apache/hama/AbstractMatrix.java =================================================================== --- src/java/org/apache/hama/AbstractMatrix.java (revision 788280) +++ src/java/org/apache/hama/AbstractMatrix.java (working copy) @@ -51,6 +51,7 @@ import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; +import org.apache.hama.algebra.JacobiEigenValue; import org.apache.hama.algebra.MatrixNormMapRed; import org.apache.hama.algebra.TransposeMap; import org.apache.hama.algebra.TransposeReduce; @@ -146,6 +147,16 @@ this.tableDesc.addFamily(new HColumnDescriptor(Bytes .toBytes(Constants.BLOCK), 1, CompressionType.NONE, false, false, Integer.MAX_VALUE, HConstants.FOREVER, false)); + // the following families are used in JacobiEigenValue computation + this.tableDesc.addFamily(new HColumnDescriptor(Bytes + .toBytes(JacobiEigenValue.EI), 1, CompressionType.NONE, false, false, + Integer.MAX_VALUE, HConstants.FOREVER, false)); + this.tableDesc.addFamily(new HColumnDescriptor(Bytes + .toBytes(JacobiEigenValue.EICOL), 10, CompressionType.NONE, false, false, + Integer.MAX_VALUE, HConstants.FOREVER, false)); + this.tableDesc.addFamily(new HColumnDescriptor(Bytes + .toBytes(JacobiEigenValue.EIVEC), 10, CompressionType.NONE, false, false, + Integer.MAX_VALUE, HConstants.FOREVER, false)); LOG.info("Initializing the matrix storage."); this.admin.createTable(this.tableDesc); Index: src/java/org/apache/hama/algebra/JacobiEigenValue.java =================================================================== --- src/java/org/apache/hama/algebra/JacobiEigenValue.java (revision 0) +++ src/java/org/apache/hama/algebra/JacobiEigenValue.java (revision 0) @@ -0,0 +1,574 @@ +package org.apache.hama.algebra; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.mapred.TableSplit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.StringUtils; +import org.apache.hama.Constants; +import org.apache.hama.io.DoubleEntry; +import org.apache.hama.io.Pair; +import org.apache.hama.io.VectorUpdate; +import org.apache.hama.mapred.HTableInputFormatBase; +import org.apache.hama.mapred.HTableRecordReaderBase; +import org.apache.hama.util.BytesUtil; + +/** + * A catalog class collect all the m/r classes to compute the matrix's + * eigen values + */ +public class JacobiEigenValue { + + /** a matrix copy of the original copy collected in "eicol" family **/ + public static final String EICOL = "eicol:"; + /** a column family collect all values and statuses used during computation **/ + public static final String EI = "eival:"; + /** a column collect all the eigen values **/ + public static final String EIVAL = EI + "value"; + /** a column identify whether the eigen values have been changed **/ + public static final String EICHANGED = EI + "changed"; + /** a column identify the index of the max absolute value each row **/ + public static final String EIIND = EI + "ind"; + /** a matrix collect all the eigen vectors **/ + public static final String EIVEC = "eivec:"; + public static final String MATRIX = "hama.jacobieigenvalue.matrix"; + /** parameters for pivot **/ + public static final String PIVOTROW = "hama.jacobi.pivot.row"; + public static final String PIVOTCOL = "hama.jacobi.pivot.col"; + public static final String PIVOTSIN = "hama.jacobi.pivot.sin"; + public static final String PIVOTCOS = "hama.jacobi.pivot.cos"; + + static final Log LOG = LogFactory.getLog(JacobiEigenValue.class); + + /** + * The matrix will be modified during computing eigen value. + * So a new matrix will be created to prevent the original matrix being + * modified. + * To reduce the network transfer, we copy the "column" family in the + * original matrix to a "eicol" family. All the following modification + * will be done over "eicol" family. + * + * And the output Eigen Vector Arrays "eivec", and the output eigen value + * array "eival:value", and the temp status array "eival:changed", "eival:ind" + * will be created. + * + * Also "eival:state" will record the state of the rotation state of a matrix + */ + public static class InitMapper extends MapReduceBase + implements Mapper { + + HTable table; + + @Override + public void configure(JobConf job) { + String tableName = job.get(MATRIX, ""); + try { + table = new HTable(new HBaseConfiguration(job), tableName); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + @Override + public void map(IntWritable key, MapWritable value, + OutputCollector collector, + Reporter reporter) throws IOException { + int row, col; + row = key.get(); + VectorUpdate vu = new VectorUpdate(row); + + double val; + double maxVal = Double.MIN_VALUE; + int maxInd = row + 1; + + boolean init = true; + for(Map.Entry e : value.entrySet()) { + val = ((DoubleEntry) e.getValue()).getValue(); + col = ((IntWritable) e.getKey()).get(); + // copy the original matrix to "EICOL" family + vu.put(JacobiEigenValue.EICOL, col, val); + // make the "EIVEC" a dialog matrix + vu.put(JacobiEigenValue.EIVEC, col, col == row ? 1 : 0); + if(col == row) { + vu.put(JacobiEigenValue.EIVAL, val); + } + // find the max index + if(col > row) { + if(init) { + maxInd = col; + maxVal = val; + init = false; + } else { + if(Math.abs(val) > Math.abs(maxVal)) { + maxVal = val; + maxInd = col; + } + } + } + } + // index array + vu.put(JacobiEigenValue.EIIND, maxInd); + // Changed Array set to be true during initialization + vu.put(JacobiEigenValue.EICHANGED, 1); + + table.commit(vu.getBatchUpdate()); + } + + } + + /** + * PivotInputFormat & PivotMapper & PivotReducer are used to find the + * pivot in a matrix + */ + public static class PivotInputFormat extends HTableInputFormatBase implements + InputFormat, JobConfigurable { + + private PivotRecordReader tableRecordReader; + + protected static class PivotRecordReader extends HTableRecordReaderBase + implements RecordReader { + + private int totalRows; + private int processedRows; + private int size; + boolean mocked = true; + + @Override + public void init() throws IOException { + super.init(); + + Cell rows = null; + rows = htable.get(Constants.METADATA, Constants.METADATA_ROWS); + size = (rows != null) ? BytesUtil.bytesToInt(rows.getValue()) : 0; + + if(endRow.length == 0) { // the last split, we don't know the end row + totalRows = 0; // so we just skip it. + } else { + if(startRow.length == 0) { // the first split, start row is 0 + totalRows = BytesUtil.bytesToInt(endRow); + } else { + totalRows = BytesUtil.bytesToInt(endRow) - BytesUtil.bytesToInt(startRow); + } + } + processedRows = 0; + LOG.info("Split (" + Bytes.toString(startRow) + ", " + Bytes.toString(endRow) + + ") -> " + totalRows); + } + + + /** + * @return Pair + * + * @see org.apache.hadoop.mapred.RecordReader#createKey() + */ + public Pair createKey() { + return new Pair(); + } + + /** + * @return DoubleWritable + * + * @see org.apache.hadoop.mapred.RecordReader#createValue() + */ + public DoubleWritable createValue() { + return new DoubleWritable(); + } + + /** + * @param key Pair as input key. + * @param value DoubleWritable as input value + * + * Converts Scanner.next() to Pair, DoubleWritable + * + * @return true if there was more data + * @throws IOException + */ + public boolean next(Pair key, DoubleWritable value) + throws IOException { + RowResult result; + try { + result = this.scanner.next(); + } catch (UnknownScannerException e) { + LOG.debug("recovered from " + StringUtils.stringifyException(e)); + restart(lastRow); + this.scanner.next(); // skip presumed already mapped row + result = this.scanner.next(); + } + + boolean hasMore = result != null && result.size() > 0; + if (hasMore) { + byte[] row = result.getRow(); + int rowId = BytesUtil.bytesToInt(row); + if(rowId == size-1) { // skip the last row + if(mocked) { + key.set(Integer.MAX_VALUE, Integer.MAX_VALUE); + mocked = false; + return true; + } else { + return false; + } + } + + byte[] col = result.get(EIIND).getValue(); + int colId = BytesUtil.bytesToInt(col); + double val = 0; + + // get (rowId, colId)'s value + Cell cell = htable.get(BytesUtil.getRowIndex(rowId), Bytes.toBytes(EICOL + colId)); + if(cell != null && cell.getValue() != null) { + val = BytesUtil.bytesToDouble(cell.getValue()); + } + + key.set(rowId, colId); + value.set(val); + + lastRow = row; + processedRows++; + } else { + if(mocked) { + key.set(Integer.MAX_VALUE, Integer.MAX_VALUE); + mocked = false; + return true; + } else { + return false; + } + } + return hasMore; + } + + @Override + public float getProgress() { + if(totalRows <= 0) { + return 0; + } else { + return Math.min(1.0f, processedRows / (float)totalRows); + } + } + + } + + @Override + public RecordReader getRecordReader( + InputSplit split, JobConf conf, Reporter reporter) throws IOException { + TableSplit tSplit = (TableSplit) split; + PivotRecordReader trr = this.tableRecordReader; + // if no table record reader was provided use default + if (trr == null) { + trr = new PivotRecordReader(); + } + trr.setStartRow(tSplit.getStartRow()); + trr.setEndRow(tSplit.getEndRow()); + trr.setHTable(this.table); + trr.setInputColumns(this.inputColumns); + trr.setRowFilter(this.rowFilter); + trr.init(); + return trr; + } + + protected void setTableRecordReader(PivotRecordReader tableRecordReader) { + this.tableRecordReader = tableRecordReader; + } + + } + + // find the pivot of the matrix + public static class PivotMapper extends MapReduceBase + implements Mapper { + + private double max = 0; + private Pair pair = new Pair(0, 0); + private Pair dummyPair = new Pair(Integer.MAX_VALUE, Integer.MAX_VALUE); + private DoubleWritable dummyVal = new DoubleWritable(0.0); + + @Override + public void map(Pair key, DoubleWritable value, + OutputCollector collector, Reporter reporter) + throws IOException { + if(key.getRow() != Integer.MAX_VALUE) { + if(Math.abs(value.get()) > Math.abs(max)) { + pair.set(key.getRow(), key.getColumn()); + max = value.get(); + } + } else { + collector.collect(pair, new DoubleWritable(max)); + collector.collect(dummyPair, dummyVal); + } + } + + } + + public static class PivotReducer extends MapReduceBase implements + Reducer { + + private double max = 0; + private Pair pair = new Pair(0, 0); + + @Override + public void reduce(Pair key, Iterator values, + OutputCollector collector, Reporter reporter) + throws IOException { + double val; + if(key.getRow() != Integer.MAX_VALUE) { + val = values.next().get(); + if(Math.abs(val) > Math.abs(max)) { + pair.set(key.getRow(), key.getColumn()); + max = val; + } + } else { + collector.collect(pair, new DoubleWritable(max)); + } + } + + } + + /** + * Tricky here! + * we rotation the matrix during we scan the matrix and update to the matrix + * so we just need a rotationrecordreader to scan the matrix and do the rotation + * the mapper&reducer just a dummy mapper + */ + public static class RotationInputFormat extends HTableInputFormatBase implements + InputFormat, JobConfigurable { + + private RotationRecordReader tableRecordReader; + + int pivot_row, pivot_col; + double pivot_cos, pivot_sin; + + public void configure(JobConf job) { + super.configure(job); + pivot_row = job.getInt(PIVOTROW, -1); + pivot_col = job.getInt(PIVOTCOL, -1); + pivot_sin = Double.parseDouble(job.get(PIVOTSIN)); + pivot_cos = Double.parseDouble(job.get(PIVOTCOS)); + } + + protected static class RotationRecordReader extends HTableRecordReaderBase + implements RecordReader { + + private int totalRows; + private int processedRows; + int startRowId, endRowId = -1; + int size; + + int pivotrow, pivotcol; + byte[] prow, pcol; + double pivotcos, pivotsin; + + public RotationRecordReader(int pr, int pc, double psin, double pcos) { + super(); + pivotrow = pr; + pivotcol = pc; + pivotsin = psin; + pivotcos = pcos; + prow = Bytes.toBytes(pivotrow); + pcol = Bytes.toBytes(pivotcol); + } + + @Override + public void init() throws IOException { + super.init(); + + Cell rows = null; + rows = htable.get(Constants.METADATA, Constants.METADATA_ROWS); + size = (rows != null) ? BytesUtil.bytesToInt(rows.getValue()) : 0; + + if(endRow.length == 0) { // the last split, we don't know the end row + totalRows = 0; // so we just skip it. + if(startRow.length == 0) + startRowId = 0; + else + startRowId = BytesUtil.bytesToInt(startRow); + endRowId = -1; + } else { + if(startRow.length == 0) { // the first split, start row is 0 + totalRows = BytesUtil.bytesToInt(endRow); + startRowId = 0; + endRowId = totalRows; + } else { + startRowId = BytesUtil.bytesToInt(startRow); + endRowId = BytesUtil.bytesToInt(endRow); + totalRows = startRowId - endRowId; + } + } + processedRows = 0; + LOG.info("Split (" + startRowId + ", " + endRowId + + ") -> " + totalRows); + } + + + /** + * @return NullWritable + * + * @see org.apache.hadoop.mapred.RecordReader#createKey() + */ + public NullWritable createKey() { + return NullWritable.get(); + } + + /** + * @return NullWritable + * + * @see org.apache.hadoop.mapred.RecordReader#createValue() + */ + public NullWritable createValue() { + return NullWritable.get(); + } + + /** + * @param key NullWritable as input key. + * @param value NullWritable as input value + * + * Converts Scanner.next() to NullWritable, NullWritable + * + * @return true if there was more data + * @throws IOException + */ + public boolean next(NullWritable key, NullWritable value) + throws IOException { + RowResult result; + try { + result = this.scanner.next(); + } catch (UnknownScannerException e) { + LOG.debug("recovered from " + StringUtils.stringifyException(e)); + restart(lastRow); + this.scanner.next(); // skip presumed already mapped row + result = this.scanner.next(); + } + + double s1, s2; + VectorUpdate bu; + boolean hasMore = result != null && result.size() > 0; + if (hasMore) { + byte[] row = result.getRow(); + int rowId = BytesUtil.bytesToInt(row); + if(rowId < pivotrow ) { + s1 = BytesUtil.bytesToDouble(htable.get(BytesUtil.getRowIndex(rowId), + Bytes.toBytes(JacobiEigenValue.EICOL + pivotrow)).getValue()); + s2 = BytesUtil.bytesToDouble(htable.get(BytesUtil.getRowIndex(rowId), + Bytes.toBytes(JacobiEigenValue.EICOL + pivotcol)).getValue()); + + bu = new VectorUpdate(rowId); + bu.put(EICOL, pivotrow, pivotcos * s1 - pivotsin * s2); + bu.put(EICOL, pivotcol, pivotsin * s1 + pivotcos * s2); + + htable.commit(bu.getBatchUpdate()); + } else if(rowId == pivotrow) { + return true; + } else if(rowId < pivotcol) { + s1 = BytesUtil.bytesToDouble(htable.get(BytesUtil.getRowIndex(pivotrow), + Bytes.toBytes(EICOL + rowId)).getValue()); + s2 = BytesUtil.bytesToDouble(htable.get(BytesUtil.getRowIndex(rowId), + Bytes.toBytes(EICOL + pivotcol)).getValue()); + + bu = new VectorUpdate(rowId); + bu.put(EICOL, pivotcol, pivotsin * s1 + pivotcos * s2); + htable.commit(bu.getBatchUpdate()); + + bu = new VectorUpdate(pivotrow); + bu.put(EICOL, rowId, pivotcos * s1 - pivotsin * s2); + htable.commit(bu.getBatchUpdate()); + } else if(rowId == pivotcol) { + for(int i = pivotcol+1; i < size; i++) { + s1 = BytesUtil.bytesToDouble(htable.get(BytesUtil.getRowIndex(pivotrow), Bytes.toBytes(EICOL + i)).getValue()); + s2 = BytesUtil.bytesToDouble(htable.get(BytesUtil.getRowIndex(pivotcol), Bytes.toBytes(EICOL + i)).getValue()); + + bu = new VectorUpdate(pivotcol); + bu.put(EICOL, i, pivotsin * s1 + pivotcos * s2); + htable.commit(bu.getBatchUpdate()); + + bu = new VectorUpdate(pivotrow); + bu.put(EICOL, i, pivotcos * s1 - pivotsin * s2); + htable.commit(bu.getBatchUpdate()); + } + } else { // rowId > pivotcol + return false; + } + + lastRow = row; + processedRows++; + } + return hasMore; + } + + @Override + public float getProgress() { + if(totalRows <= 0) { + return 0; + } else { + return Math.min(1.0f, processedRows / (float)totalRows); + } + } + + } + + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + InputSplit[] splits = super.getSplits(job, numSplits); + List newSplits = new ArrayList(); + for(InputSplit split : splits) { + TableSplit ts = (TableSplit)split; + byte[] row = ts.getStartRow(); + if(row.length == 0) // the first split + newSplits.add(split); + else { + if(BytesUtil.bytesToInt(ts.getStartRow()) < pivot_col) { + newSplits.add(split); + } + } + } + + return newSplits.toArray(new InputSplit[newSplits.size()]); + } + + @Override + public RecordReader getRecordReader( + InputSplit split, JobConf conf, Reporter reporter) throws IOException { + TableSplit tSplit = (TableSplit) split; + RotationRecordReader trr = this.tableRecordReader; + // if no table record reader was provided use default + if (trr == null) { + trr = new RotationRecordReader(pivot_row, pivot_col, pivot_sin, pivot_cos); + } + trr.setStartRow(tSplit.getStartRow()); + trr.setEndRow(tSplit.getEndRow()); + trr.setHTable(this.table); + trr.setInputColumns(this.inputColumns); + trr.setRowFilter(this.rowFilter); + trr.init(); + return trr; + } + + protected void setTableRecordReader(RotationRecordReader tableRecordReader) { + this.tableRecordReader = tableRecordReader; + } + + } + +} Index: src/java/org/apache/hama/DenseMatrix.java =================================================================== --- src/java/org/apache/hama/DenseMatrix.java (revision 788280) +++ src/java/org/apache/hama/DenseMatrix.java (working copy) @@ -33,26 +33,34 @@ import org.apache.hadoop.hbase.io.Cell; import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.lib.NullOutputFormat; import org.apache.hama.algebra.BlockMultiplyMap; import org.apache.hama.algebra.BlockMultiplyReduce; import org.apache.hama.algebra.DenseMatrixVectorMultMap; import org.apache.hama.algebra.DenseMatrixVectorMultReduce; +import org.apache.hama.algebra.JacobiEigenValue; import org.apache.hama.algebra.RowCyclicAdditionMap; import org.apache.hama.algebra.RowCyclicAdditionReduce; import org.apache.hama.io.BlockID; import org.apache.hama.io.BlockWritable; import org.apache.hama.io.DoubleEntry; +import org.apache.hama.io.Pair; import org.apache.hama.io.VectorUpdate; import org.apache.hama.mapred.CollectBlocksMapper; +import org.apache.hama.mapred.DummyMapper; import org.apache.hama.mapred.RandomMatrixMap; import org.apache.hama.mapred.RandomMatrixReduce; import org.apache.hama.mapred.VectorInputFormat; @@ -666,4 +674,245 @@ JobManager.execute(jobConf); } + + /** + * Compute all the eigen values. + * Note: all the eigen values are collected in the "eival:value" column, + * and the eigen vector of a specified eigen value is collected in the + * "eivec:" column family in the same row. + * + * TODO: we may need to expose the interface to access the eigen values + * and vectors + * + * @param loops limit the loops of the computation + * @throws IOException + */ + public void jacobiEigenValue(int loops) throws IOException { + JobConf jobConf = new JobConf(config); + + /****************************************************************** + * Initialization + * + * A M/R job is used for initialization(such as, preparing a matrx + * copy of the original in "eicol:" family.) + ******************************************************************/ + // initialization + jobConf.setJobName("JacobiEigen initialization MR job" + getPath()); + + jobConf.setMapperClass(JacobiEigenValue.InitMapper.class); + jobConf.setInputFormat(VectorInputFormat.class); + jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN); + + FileInputFormat.addInputPaths(jobConf, getPath()); + jobConf.set(JacobiEigenValue.MATRIX, getPath()); + jobConf.setOutputFormat(NullOutputFormat.class); + jobConf.setMapOutputKeyClass(IntWritable.class); + jobConf.setMapOutputValueClass(MapWritable.class); + + JobManager.execute(jobConf); + + final FileSystem fs = FileSystem.get(jobConf); + Pair pivotPair = new Pair(); + DoubleWritable pivotWritable = new DoubleWritable(); + VectorUpdate vu ; + + // loop + int size = this.getRows(); + int state = size; + int pivot_row, pivot_col; + double pivot; + double s, c, t, y; + + while(state != 0 && loops > 0) { + /****************************************************************** + * Find the pivot and its index(pivot_row, pivot_col) + * + * A M/R job is used to scan all the "eival:ind" to get the max + * absolute value of each row, and do a MAX aggregation of these + * max values to get the max value in the matrix. + ******************************************************************/ + jobConf = new JobConf(config); + jobConf.setJobName("Find Pivot MR job" + getPath()); + + jobConf.setNumReduceTasks(1); + + Path outDir = new Path(new Path(getType() + "_TMP_FindPivot_dir_" + System.currentTimeMillis()), "out"); + if(fs.exists(outDir)) + fs.delete(outDir, true); + + jobConf.setMapperClass(JacobiEigenValue.PivotMapper.class); + jobConf.setInputFormat(JacobiEigenValue.PivotInputFormat.class); + jobConf.set(JacobiEigenValue.PivotInputFormat.COLUMN_LIST, JacobiEigenValue.EIIND); + FileInputFormat.addInputPaths(jobConf, getPath()); + jobConf.setMapOutputKeyClass(Pair.class); + jobConf.setMapOutputValueClass(DoubleWritable.class); + + jobConf.setOutputKeyClass(Pair.class); + jobConf.setOutputValueClass(DoubleWritable.class); + jobConf.setOutputFormat(SequenceFileOutputFormat.class); + FileOutputFormat.setOutputPath(jobConf, outDir); + + // update the out put dir of the job + outDir = FileOutputFormat.getOutputPath(jobConf); + + JobManager.execute(jobConf); + + //read outputs + Path inFile = new Path(outDir, "part-00000"); + SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf); + try { + reader.next(pivotPair, pivotWritable); + pivot_row = pivotPair.getRow(); + pivot_col = pivotPair.getColumn(); + pivot = pivotWritable.get(); + } finally { + reader.close(); + } + fs.delete(outDir.getParent(), true); + + /****************************************************************** + * Calculation + * + * Compute the rotation parameters of next rotation. + ******************************************************************/ + double e1 = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(pivot_row), + Bytes.toBytes(JacobiEigenValue.EIVAL)).getValue()); + double e2 = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(pivot_col), + Bytes.toBytes(JacobiEigenValue.EIVAL)).getValue()); + + y = (e2 - e1) / 2; + t = Math.abs(y) + Math.sqrt(pivot * pivot + y * y); + s = Math.sqrt(pivot * pivot + t * t); + c = t / s; + s = pivot / s; + t = (pivot * pivot) / t; + if(y < 0) { + s = -s; + t = -t; + } + + /****************************************************************** + * Upate the pivot and the eigen values indexed by the pivot + ******************************************************************/ + vu = new VectorUpdate(pivot_row); + vu.put(JacobiEigenValue.EICOL, pivot_col, 0); + table.commit(vu.getBatchUpdate()); + + state = update(pivot_row, -t, state); + state = update(pivot_col, t, state); + + /****************************************************************** + * Rotation the matrix + ******************************************************************/ + // rotation + jobConf = new JobConf(config); + jobConf.setJobName("Rotation Matrix MR job" + getPath()); + + jobConf.setInt(JacobiEigenValue.PIVOTROW, pivot_row); + jobConf.setInt(JacobiEigenValue.PIVOTCOL, pivot_col); + jobConf.set(JacobiEigenValue.PIVOTSIN, String.valueOf(s)); + jobConf.set(JacobiEigenValue.PIVOTCOS, String.valueOf(c)); + + jobConf.setMapperClass(DummyMapper.class); + jobConf.setInputFormat(JacobiEigenValue.RotationInputFormat.class); + jobConf.set(JacobiEigenValue.RotationInputFormat.COLUMN_LIST, JacobiEigenValue.EIIND); + FileInputFormat.addInputPaths(jobConf, getPath()); + jobConf.setMapOutputKeyClass(NullWritable.class); + jobConf.setMapOutputValueClass(NullWritable.class); + FileInputFormat.addInputPaths(jobConf, getPath()); + jobConf.setOutputFormat(NullOutputFormat.class); + + JobManager.execute(jobConf); + + // rotate eigenvectors + LOG.info("rotating eigenvector"); + for(int i = 0; i < size; i++) { + e1 = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(pivot_row), + Bytes.toBytes(JacobiEigenValue.EIVEC + i)).getValue()); + e2 = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(pivot_col), + Bytes.toBytes(JacobiEigenValue.EIVEC + i)).getValue()); + + vu = new VectorUpdate(pivot_row); + vu.put(JacobiEigenValue.EIVEC, i, c * e1 - s * e2); + table.commit(vu.getBatchUpdate()); + + vu = new VectorUpdate(pivot_col); + vu.put(JacobiEigenValue.EIVEC, i, s * e1 + c * e2); + table.commit(vu.getBatchUpdate()); + } + + LOG.info("update index..."); + // update index array + maxind(pivot_row, size); + maxind(pivot_col, size); + + loops --; + } + } + + void maxind(int row, int size) throws IOException { + int m = row + 1; + if(row + 2 < size) { + double max = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(row), + Bytes.toBytes(JacobiEigenValue.EICOL + m)).getValue()); + double val; + for(int i=row + 2; i Math.abs(max)) { + m = i; + max = val; + } + } + } + + VectorUpdate vu = new VectorUpdate(row); + vu.put(JacobiEigenValue.EIIND, m); + table.commit(vu.getBatchUpdate()); + } + + int update(int row, double value, int state) throws IOException { + double e = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(row), + Bytes.toBytes(JacobiEigenValue.EIVAL)).getValue()); + int changed = BytesUtil.bytesToInt(table.get(BytesUtil.getRowIndex(row), + Bytes.toBytes(JacobiEigenValue.EICHANGED)).getValue()); + double y = e; + e += value; + + VectorUpdate vu = new VectorUpdate(row); + vu.put(JacobiEigenValue.EIVAL, e); + if(changed == 1 && y == e) { + changed = 0; + vu.put(JacobiEigenValue.EICHANGED, changed); + state --; + } else if(changed == 0 && y != e) { + changed = 1; + vu.put(JacobiEigenValue.EICHANGED, changed); + state ++; + } + table.commit(vu.getBatchUpdate()); + return state; + } + + // for test + boolean verifyEigenValue(double[] e, double[][] E) throws IOException { + boolean success = true; + double e1, ev, gap; + for(int i=0; i -0.000001); + if(!success) return success; + + for(int j=0; j -0.000001); + if(!success) return success; + } + } + return success; + } } Index: src/java/org/apache/hama/io/Pair.java =================================================================== --- src/java/org/apache/hama/io/Pair.java (revision 0) +++ src/java/org/apache/hama/io/Pair.java (revision 0) @@ -0,0 +1,65 @@ +package org.apache.hama.io; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.WritableComparable; + +/** A Pair stands for (row, column) pair **/ +public class Pair implements WritableComparable { + + int row, col; + + public Pair() {} + + public Pair(int row_, int col_) { + set(row_, col_); + } + + public int getRow() { return row; } + public int getColumn() { return col; } + + public void setRow(int row_) { row = row_; } + public void setColumn(int col_) { col = col_; } + public void set(int row_, int col_) { + row = row_; + col = col_; + } + + @Override + public void readFields(DataInput in) throws IOException { + row = in.readInt(); + col = in.readInt(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(row); + out.writeInt(col); + } + + @Override + public int compareTo(Pair p) { + return row == p.row ? col - p.col : row - p.row; + } + + @Override + public boolean equals(Object obj) { + Pair pair = (Pair)obj; + return compareTo(pair) == 0; + } + + @Override + public int hashCode() { + return row; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append('(').append(row).append(',').append(col).append(')'); + return sb.toString(); + } + +} Index: src/java/org/apache/hama/io/VectorUpdate.java =================================================================== --- src/java/org/apache/hama/io/VectorUpdate.java (revision 788280) +++ src/java/org/apache/hama/io/VectorUpdate.java (working copy) @@ -50,6 +50,20 @@ this.batchUpdate.put(BytesUtil.getColumnIndex(j), BytesUtil .doubleToBytes(value)); } + + /** + * Put the value in "cfName+j" + * @param cfName + * @param j + * @param value + */ + public void put(String cfName, int j, double value) { + this.batchUpdate.put(Bytes.toBytes(cfName + j), Bytes.toBytes(value)); + } + + public void put(String name, double value) { + this.batchUpdate.put(Bytes.toBytes(name), Bytes.toBytes(value)); + } public void put(int j, String name) { this.batchUpdate.put(Bytes.toBytes((Constants.ATTRIBUTE + j)), Bytes Index: src/java/org/apache/hama/mapred/DummyMapper.java =================================================================== --- src/java/org/apache/hama/mapred/DummyMapper.java (revision 0) +++ src/java/org/apache/hama/mapred/DummyMapper.java (revision 0) @@ -0,0 +1,20 @@ +package org.apache.hama.mapred; + +import java.io.IOException; + +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; + +/** Implements the dummy function, mapping inputs directly to outputs. */ +public class DummyMapper + extends MapReduceBase implements Mapper { + + /** The dummy function. */ + public void map(K key, V val, + OutputCollector output, Reporter reporter) + throws IOException { + // do nothing + } +} \ No newline at end of file Index: src/test/org/apache/hama/TestDenseMatrix.java =================================================================== --- src/test/org/apache/hama/TestDenseMatrix.java (revision 788280) +++ src/test/org/apache/hama/TestDenseMatrix.java (working copy) @@ -43,7 +43,7 @@ private static Matrix m1; private static Matrix m2; private static Matrix m3; - private static Matrix m4; + private static Matrix m4, m5; private final static String aliase1 = "matrix_aliase_A"; private final static String aliase2 = "matrix_aliase_B"; private static HamaConfiguration conf; @@ -64,6 +64,7 @@ m2 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE); m3 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE); m4 = DenseMatrix.random(hCluster.getConf(), SIZE-2, SIZE-2); + m5 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE); } protected void tearDown() { @@ -309,7 +310,106 @@ i++; } } - + + public void testJacobiEigenValue() throws IOException { + // copy Matrix m5 to the array + double[][] S = new double[SIZE][SIZE]; + + for (int i = 0; i < SIZE; i++) { + for (int j = 0; j < SIZE; j++) { + S[i][j] = m5.get(i, j); + } + } + + // do m/r jacobi eigen value computation + DenseMatrix dm = (DenseMatrix)m5; + dm.jacobiEigenValue(3); + + // do jacobi egien value over S array + int i, j, k, l, m, state; + double s, c, t, p, y; + double e1, e2; + // index array + int[] ind = new int[SIZE]; + boolean[] changed = new boolean[SIZE]; + + // output + double[] e = new double[SIZE]; + double[][] E = new double[SIZE][SIZE]; + + // init e & E; ind & changed + for(i=0; i 0) { + // find index(k, l) for pivot p + m = 0; + for(k = 1; k <= SIZE-2; k++) { + if(Math.abs(S[m][ind[m]]) < Math.abs(S[k][ind[k]])) { + m = k; + } + } + + k = m; l = ind[m]; p = S[k][l]; + + // calculate c = cos, s = sin + y = (e[l] - e[k]) / 2; + t = Math.abs(y) + Math.sqrt(p * p + y * y); + s = Math.sqrt(p * p + t * t); + c = t / s; + s = p / s; + t = (p * p) / t; + if(y < 0) { + s = -s; + t = -t; + } + + S[k][l] = 0.0; + state = update(e, changed, k, -t, state); + state = update(e, changed, l, t, state); + + for(i = 0; i <= k-1; i++) + rotate(S, i, k, i, l, c, s); + + for(i = l+1; i < SIZE; i++) + rotate(S, k, i, l, i, c, s); + + for(i = k+1; i <= l-1; i++) + rotate(S, k, i, i, l, c, s); + + // rotate eigenvectors + for(i = 0; i < SIZE; i++) { + e1 = E[k][i]; + e2 = E[l][i]; + + E[k][i] = c * e1 - s * e2; + E[l][i] = s * e1 + c * e2; + } + + ind[k] = maxind(S, k, SIZE); + ind[l] = maxind(S, l, SIZE); + + loops --; + } + + // verify the results + assertTrue(dm.verifyEigenValue(e, E)); + } + public void testEnsureForAddition() { try { m1.add(m4); @@ -437,5 +537,37 @@ } } } + + + //index of largest off-diagonal element in row k + int maxind(double[][] S, int row, int size) { + int m = row + 1; + for(int i=row + 2; i Math.abs(S[row][m])) + m = i; + } + return m; + } + + int update(double[] e, boolean[] changed, int row, double value, int state) { + double y = e[row]; + e[row] += value; + + if(changed[row] && y == e[row]) { + changed[row] = false; + return state - 1; + } else if(!changed[row] && y != e[row]) { + changed[row] = true; + return state + 1; + } else + return state; + } + + void rotate(double[][] S, int k, int l, int i, int j, double c, double s) { + double s1 = S[k][l], s2 = S[i][j]; + S[k][l] = c * s1 - s * s2; + S[i][j] = s * s1 + c * s2; + } + }