Index: src/examples/org/apache/hama/examples/JacobiEigen.java =================================================================== --- src/examples/org/apache/hama/examples/JacobiEigen.java (리비전 0) +++ src/examples/org/apache/hama/examples/JacobiEigen.java (리비전 0) @@ -0,0 +1,366 @@ +package org.apache.hama.examples; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hama.Constants; +import org.apache.hama.HamaAdmin; +import org.apache.hama.HamaAdminImpl; +import org.apache.hama.examples.mapreduce.DummyMapper; +import org.apache.hama.examples.mapreduce.JacobiInitMap; +import org.apache.hama.examples.mapreduce.PivotInputFormat; +import org.apache.hama.examples.mapreduce.PivotMap; +import org.apache.hama.examples.mapreduce.RotationInputFormat; +import org.apache.hama.io.Pair; +import org.apache.hama.matrix.DenseMatrix; +import org.apache.hama.matrix.Matrix; +import org.apache.hama.util.BytesUtil; + +public class JacobiEigen extends AbstractExample { + + /** + * EigenValue Constants + */ + /** a matrix copy of the original copy collected in "eicol" family * */ + public static final String EICOL = "eicol"; + + /** a column family collect all values and statuses used during computation * */ + public static final String EI = "eival"; + public static final String EIVAL = "value"; + public static final String EICHANGED = "changed"; + + /** a column identify the index of the max absolute value each row * */ + public static final String EIIND = "ind"; + + /** a matrix collect all the eigen vectors * */ + public static final String EIVEC = "eivec"; + public static final String MATRIX = "hama.jacobieigenvalue.matrix"; + + /** parameters for pivot * */ + public static final String PIVOTROW = "hama.jacobi.pivot.row"; + public static final String PIVOTCOL = "hama.jacobi.pivot.col"; + public static final String PIVOTSIN = "hama.jacobi.pivot.sin"; + public static final String PIVOTCOS = "hama.jacobi.pivot.cos"; + + private static HTable table; + + public static void main(String[] args) throws IOException { + if (args.length < 2) { + System.out + .println("add [-m maps] [-r reduces] [max_iteration]"); + System.exit(-1); + } else { + parseArgs(args); + } + + String matrixA = ARGS.get(0); + + HamaAdmin admin = new HamaAdminImpl(conf); + Matrix a = admin.getMatrix(matrixA); + + if (ARGS.size() > 1) { + jacobiEigenValue((DenseMatrix) a, Integer.parseInt(ARGS.get(1))); + } else { + jacobiEigenValue((DenseMatrix) a, Integer.MAX_VALUE); + } + } + + /** + * Compute all the eigen values. Note: all the eigen values are collected in + * the "eival:value" column, and the eigen vector of a specified eigen value + * is collected in the "eivec:" column family in the same row. + * + * TODO: we may need to expose the interface to access the eigen values and + * vectors + * + * @param imax limit the loops of the computation + * @throws IOException + */ + public static void jacobiEigenValue(DenseMatrix m, int imax) + throws IOException { + table = new HTable(conf, m.getPath()); + /* + * Initialization A M/R job is used for initialization(such as, preparing a + * matrx copy of the original in "eicol:" family.) + */ + // initialization + Job job = new Job(conf, "JacobiEigen initialization MR job" + m.getPath()); + + Scan scan = new Scan(); + scan.addFamily(Constants.COLUMNFAMILY); + + TableMapReduceUtil.initTableMapperJob(m.getPath(), scan, + JacobiInitMap.class, ImmutableBytesWritable.class, Put.class, job); + TableMapReduceUtil.initTableReducerJob(m.getPath(), + IdentityTableReducer.class, job); + + try { + job.waitForCompletion(true); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } + + final FileSystem fs = FileSystem.get(conf); + Pair pivotPair = new Pair(); + DoubleWritable pivotWritable = new DoubleWritable(); + Put put; + + // loop + int size = m.getRows(); + int state = size; + int pivot_row, pivot_col; + double pivot; + double s, c, t, y; + + int icount = 0; + while (state != 0 && icount < imax) { + icount = icount + 1; + /* + * Find the pivot and its index(pivot_row, pivot_col) A M/R job is used to + * scan all the "eival:ind" to get the max absolute value of each row, and + * do a MAX aggregation of these max values to get the max value in the + * matrix. + */ + Path outDir = new Path(new Path(m.getType() + "_TMP_FindPivot_dir_" + + System.currentTimeMillis()), "out"); + if (fs.exists(outDir)) + fs.delete(outDir, true); + + job = new Job(conf, "Find Pivot MR job" + m.getPath()); + + scan = new Scan(); + scan.addFamily(Bytes.toBytes(EI)); + + job.setInputFormatClass(PivotInputFormat.class); + job.setMapOutputKeyClass(Pair.class); + job.setMapOutputValueClass(DoubleWritable.class); + job.setMapperClass(PivotMap.class); + job.getConfiguration().set(PivotInputFormat.INPUT_TABLE, m.getPath()); + job.getConfiguration().set(PivotInputFormat.SCAN, + PivotInputFormat.convertScanToString(scan)); + + job.setOutputKeyClass(Pair.class); + job.setOutputValueClass(DoubleWritable.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + SequenceFileOutputFormat.setOutputPath(job, outDir); + + try { + job.waitForCompletion(true); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } + + // read outputs + Path inFile = new Path(outDir, "part-r-00000"); + SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, conf); + try { + reader.next(pivotPair, pivotWritable); + pivot_row = pivotPair.getRow(); + pivot_col = pivotPair.getColumn(); + pivot = pivotWritable.get(); + } finally { + reader.close(); + } + fs.delete(outDir, true); + fs.delete(outDir.getParent(), true); + + if (pivot_row == 0 && pivot_col == 0) + break; // stop the iterations + + /* + * Calculation Compute the rotation parameters of next rotation. + */ + Get get = new Get(BytesUtil.getRowIndex(pivot_row)); + get.addFamily(Bytes.toBytes(EI)); + Result r = table.get(get); + double e1 = Bytes.toDouble(r.getValue(Bytes.toBytes(EI), Bytes + .toBytes(EIVAL))); + + get = new Get(BytesUtil.getRowIndex(pivot_col)); + get.addFamily(Bytes.toBytes(EI)); + r = table.get(get); + double e2 = Bytes.toDouble(r.getValue(Bytes.toBytes(EI), Bytes + .toBytes(EIVAL))); + + y = (e2 - e1) / 2; + t = Math.abs(y) + Math.sqrt(pivot * pivot + y * y); + s = Math.sqrt(pivot * pivot + t * t); + c = t / s; + s = pivot / s; + t = (pivot * pivot) / t; + if (y < 0) { + s = -s; + t = -t; + } + + /* + * Upate the pivot and the eigen values indexed by the pivot + */ + put = new Put(BytesUtil.getRowIndex(pivot_row)); + put.add(Bytes.toBytes(EICOL), Bytes.toBytes(String + .valueOf(pivot_col)), Bytes.toBytes(0.0)); + table.put(put); + + state = update(pivot_row, -t, state); + state = update(pivot_col, t, state); + + /* + * Rotation the matrix + */ + job = new Job(conf, "Rotation Matrix MR job" + m.getPath()); + + scan = new Scan(); + scan.addFamily(Bytes.toBytes(EI)); + + job.getConfiguration().setInt(PIVOTROW, pivot_row); + job.getConfiguration().setInt(PIVOTCOL, pivot_col); + job.getConfiguration().set(PIVOTSIN, String.valueOf(s)); + job.getConfiguration().set(PIVOTCOS, String.valueOf(c)); + + job.setInputFormatClass(RotationInputFormat.class); + job.setMapOutputKeyClass(NullWritable.class); + job.setMapOutputValueClass(NullWritable.class); + job.setMapperClass(DummyMapper.class); + job.getConfiguration().set(RotationInputFormat.INPUT_TABLE, m.getPath()); + job.getConfiguration().set(RotationInputFormat.SCAN, + PivotInputFormat.convertScanToString(scan)); + job.setOutputFormatClass(NullOutputFormat.class); + + try { + job.waitForCompletion(true); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } + + // rotate eigenvectors + for (int i = 0; i < size; i++) { + get = new Get(BytesUtil.getRowIndex(pivot_row)); + e1 = Bytes.toDouble(table.get(get).getValue( + Bytes.toBytes(EIVEC), Bytes.toBytes(String.valueOf(i)))); + + get = new Get(BytesUtil.getRowIndex(pivot_col)); + e2 = Bytes.toDouble(table.get(get).getValue( + Bytes.toBytes(EIVEC), Bytes.toBytes(String.valueOf(i)))); + + put = new Put(BytesUtil.getRowIndex(pivot_row)); + put.add(Bytes.toBytes(EIVEC), Bytes + .toBytes(String.valueOf(i)), Bytes.toBytes(new Double(c * e1 - s + * e2))); + table.put(put); + + double eivC = (s * e1 + c * e2); + put = new Put(BytesUtil.getRowIndex(pivot_col)); + put.add(Bytes.toBytes(EIVEC), Bytes + .toBytes(String.valueOf(i)), Bytes.toBytes(eivC)); + table.put(put); + } + + // update index array + maxind(pivot_row, size); + maxind(pivot_col, size); + } + } + + static void maxind(int row, int size) throws IOException { + int m = row + 1; + Get get = null; + if (row + 2 < size) { + get = new Get(BytesUtil.getRowIndex(row)); + + double max = Bytes.toDouble(table.get(get).getValue( + Bytes.toBytes(EICOL), Bytes.toBytes(String.valueOf(m)))); + double val; + for (int i = row + 2; i < size; i++) { + get = new Get(BytesUtil.getRowIndex(row)); + val = Bytes.toDouble(table.get(get).getValue( + Bytes.toBytes(EICOL), Bytes.toBytes(String.valueOf(i)))); + if (Math.abs(val) > Math.abs(max)) { + m = i; + max = val; + } + } + } + + Put put = new Put(BytesUtil.getRowIndex(row)); + put.add(Bytes.toBytes(EI), Bytes.toBytes("ind"), Bytes + .toBytes(String.valueOf(m))); + table.put(put); + } + + static int update(int row, double value, int state) throws IOException { + Get get = new Get(BytesUtil.getRowIndex(row)); + double e = Bytes.toDouble(table.get(get).getValue( + Bytes.toBytes(EI), Bytes.toBytes(EIVAL))); + int changed = BytesUtil.bytesToInt(table.get(get).getValue( + Bytes.toBytes(EI), Bytes.toBytes("changed"))); + double y = e; + e += value; + + Put put = new Put(BytesUtil.getRowIndex(row)); + put.add(Bytes.toBytes(EI), Bytes.toBytes(EIVAL), Bytes + .toBytes(e)); + + if (changed == 1 && (Math.abs(y - e) < .0000001)) { // y == e) { + changed = 0; + put.add(Bytes.toBytes(EI), Bytes.toBytes(EICHANGED), + Bytes.toBytes(String.valueOf(changed))); + + state--; + } else if (changed == 0 && (Math.abs(y - e) > .0000001)) { + changed = 1; + put.add(Bytes.toBytes(EI), Bytes.toBytes(EICHANGED), + Bytes.toBytes(String.valueOf(changed))); + + state++; + } + table.put(put); + return state; + } + + // for test + static boolean verifyEigenValue(double[] e, double[][] E) throws IOException { + boolean success = true; + double e1, ev; + Get get = null; + for (int i = 0; i < e.length; i++) { + get = new Get(BytesUtil.getRowIndex(i)); + e1 = Bytes.toDouble(table.get(get).getValue(Bytes.toBytes(EI), + Bytes.toBytes(EIVAL))); + success &= ((Math.abs(e1 - e[i]) < .0000001)); + if (!success) + return success; + + for (int j = 0; j < E[i].length; j++) { + get = new Get(BytesUtil.getRowIndex(i)); + ev = Bytes.toDouble(table.get(get).getValue( + Bytes.toBytes(EIVEC), Bytes.toBytes(String.valueOf(j)))); + success &= ((Math.abs(ev - E[i][j]) < .0000001)); + if (!success) + return success; + } + } + return success; + } +} Index: src/examples/org/apache/hama/examples/mapreduce/DummyMapper.java =================================================================== --- src/examples/org/apache/hama/examples/mapreduce/DummyMapper.java (리비전 0) +++ src/examples/org/apache/hama/examples/mapreduce/DummyMapper.java (리비전 0) @@ -0,0 +1,15 @@ +package org.apache.hama.examples.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Mapper; + +public class DummyMapper extends Mapper { + /** The dummy function. */ + public void map(K key, V val, OutputCollector output, Reporter reporter) + throws IOException { + // do nothing + } +} Index: src/examples/org/apache/hama/examples/mapreduce/JacobiInitMap.java =================================================================== --- src/examples/org/apache/hama/examples/mapreduce/JacobiInitMap.java (리비전 0) +++ src/examples/org/apache/hama/examples/mapreduce/JacobiInitMap.java (리비전 0) @@ -0,0 +1,83 @@ +package org.apache.hama.examples.mapreduce; + +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableMapper; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hama.Constants; +import org.apache.hama.examples.JacobiEigen; +import org.apache.hama.util.BytesUtil; + +/** + * The matrix will be modified during computing eigen value. So a new matrix + * will be created to prevent the original matrix being modified. To reduce the + * network transfer, we copy the "column" family in the original matrix to a + * "eicol" family. All the following modification will be done over "eicol" + * family. + * + * And the output Eigen Vector Arrays "eivec", and the output eigen value array + * "eival:value", and the temp status array "eival:changed", "eival:ind" will be + * created. + * + * Also "eival:state" will record the state of the rotation state of a matrix + */ +public class JacobiInitMap extends TableMapper { + + public void map(ImmutableBytesWritable key, Result value, Context context) + throws IOException, InterruptedException { + int row, col; + row = BytesUtil.getRowIndex(key.get()); + Put put = new Put(BytesUtil.getRowIndex(row)); + + double val; + double maxVal = Double.MIN_VALUE; + int maxInd = row + 1; + + boolean init = true; + + NavigableMap map = value + .getFamilyMap(Constants.COLUMNFAMILY); + for (Map.Entry e : map.entrySet()) { + val = Bytes.toDouble(e.getValue()); + col = BytesUtil.bytesToInt(e.getKey()); + // copy the original matrix to "EICOL" family + put.add(Bytes.toBytes(JacobiEigen.EICOL), Bytes.toBytes(String.valueOf(col)), Bytes + .toBytes(val)); + // make the "EIVEC" a dialog matrix + put.add(Bytes.toBytes(JacobiEigen.EIVEC), Bytes.toBytes(String.valueOf(col)), Bytes + .toBytes(col == row ? new Double(1) : new Double(0))); + + if (col == row) { + put.add(Bytes.toBytes(JacobiEigen.EI), Bytes.toBytes(JacobiEigen.EIVAL), Bytes + .toBytes(val)); + } + // find the max index + if (col > row) { + if (init) { + maxInd = col; + maxVal = val; + init = false; + } else { + if (Math.abs(val) > Math.abs(maxVal)) { + maxVal = val; + maxInd = col; + } + } + } + } + + // index array + put.add(Bytes.toBytes(JacobiEigen.EI), Bytes.toBytes(JacobiEigen.EIIND), Bytes + .toBytes(String.valueOf(maxInd))); + // Changed Array set to be true during initialization + put.add(Bytes.toBytes(JacobiEigen.EI), Bytes.toBytes(JacobiEigen.EICHANGED), Bytes + .toBytes(String.valueOf(1))); + + context.write(key, put); + } +} Index: src/examples/org/apache/hama/examples/mapreduce/PivotInputFormat.java =================================================================== --- src/examples/org/apache/hama/examples/mapreduce/PivotInputFormat.java (리비전 0) +++ src/examples/org/apache/hama/examples/mapreduce/PivotInputFormat.java (리비전 0) @@ -0,0 +1,286 @@ +package org.apache.hama.examples.mapreduce; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mapreduce.TableSplit; +import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.StringUtils; +import org.apache.hama.examples.JacobiEigen; +import org.apache.hama.io.Pair; +import org.apache.hama.util.BytesUtil; + +public class PivotInputFormat extends InputFormat + implements Configurable { + final static Log LOG = LogFactory.getLog(PivotInputFormat.class); + + /** Job parameter that specifies the output table. */ + public static final String INPUT_TABLE = "hama.mapreduce.inputtable"; + /** Space delimited list of columns. */ + public static final String SCAN = "hama.mapreduce.scan"; + + /** The configuration. */ + private Configuration conf = null; + + /** Holds the details for the internal scanner. */ + private Scan scan = null; + /** The table to scan. */ + private HTable table = null; + /** The reader scanning the table, can be a custom one. */ + private PivotRecordReader pivotRecordReader = null; + + @Override + public List getSplits(JobContext context) throws IOException { + if (table == null) { + throw new IOException("No table was provided."); + } + byte[][] startKeys = table.getStartKeys(); + if (startKeys == null || startKeys.length == 0) { + throw new IOException("Expecting at least one region."); + } + int realNumSplits = startKeys.length; + InputSplit[] splits = new InputSplit[realNumSplits]; + int middle = startKeys.length / realNumSplits; + int startPos = 0; + for (int i = 0; i < realNumSplits; i++) { + int lastPos = startPos + middle; + lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos; + String regionLocation = table.getRegionLocation(startKeys[startPos]) + .getServerAddress().getHostname(); + splits[i] = new TableSplit(this.table.getTableName(), + startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos] + : HConstants.EMPTY_START_ROW, regionLocation); + LOG.info("split: " + i + "->" + splits[i]); + startPos = lastPos; + } + return Arrays.asList(splits); + } + + protected static class PivotRecordReader extends + RecordReader { + private int totalRows; + private int processedRows; + private int size; + boolean mocked = true; + + private ResultScanner scanner = null; + private Scan scan = null; + private HTable htable = null; + private byte[] lastRow = null; + private Pair key = null; + private DoubleWritable value = null; + + @Override + public void close() { + this.scanner.close(); + } + + public void setScan(Scan scan) { + this.scan = scan; + } + + public void setHTable(HTable htable) { + this.htable = htable; + } + + public void init() throws IOException { + restart(scan.getStartRow()); + } + + public void restart(byte[] firstRow) throws IOException { + Scan newScan = new Scan(scan); + newScan.setStartRow(firstRow); + this.scanner = this.htable.getScanner(newScan); + } + + @Override + public Pair getCurrentKey() throws IOException, InterruptedException { + return key; + } + + @Override + public DoubleWritable getCurrentValue() throws IOException, + InterruptedException { + return value; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + if (totalRows <= 0) { + return 0; + } else { + return Math.min(1.0f, processedRows / (float) totalRows); + } + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (key == null) + key = new Pair(); + if (value == null) + value = new DoubleWritable(); + + Result vv; + try { + vv = this.scanner.next(); + } catch (IOException e) { + LOG.debug("recovered from " + StringUtils.stringifyException(e)); + restart(lastRow); + scanner.next(); // skip presumed already mapped row + vv = scanner.next(); + } + + boolean hasMore = vv != null && vv.size() > 0; + if (hasMore) { + + byte[] row = vv.getRow(); + + int rowId = BytesUtil.getRowIndex(row); + if (rowId == size - 1) { // skip the last row + if (mocked) { + key.set(Integer.MAX_VALUE, Integer.MAX_VALUE); + mocked = false; + return true; + } else { + return false; + } + } + + byte[] col = vv.getValue(Bytes + .toBytes(JacobiEigen.EI), Bytes + .toBytes(JacobiEigen.EIIND)); + int colId = BytesUtil.bytesToInt(col); + double val = 0; + + Get get = new Get(BytesUtil.getRowIndex(rowId)); + byte[] cell = htable.get(get).getValue( + Bytes.toBytes(JacobiEigen.EICOL), + Bytes.toBytes(String.valueOf(colId))); + if (cell != null) { + val = Bytes.toDouble(cell); + } + + key.set(rowId, colId); + value.set(val); + + lastRow = row; + } else { + if (mocked) { + key.set(Integer.MAX_VALUE, Integer.MAX_VALUE); + mocked = false; + return true; + } else { + return false; + } + } + + return hasMore; + } + } + + @Override + public RecordReader createRecordReader( + InputSplit split, TaskAttemptContext context) throws IOException { + TableSplit tSplit = (TableSplit) split; + PivotRecordReader trr = this.pivotRecordReader; + // if no table record reader was provided use default + if (trr == null) { + trr = new PivotRecordReader(); + } + Scan sc = new Scan(this.scan); + sc.setStartRow(tSplit.getStartRow()); + sc.setStopRow(tSplit.getEndRow()); + trr.setScan(sc); + trr.setHTable(table); + trr.init(); + return trr; + } + + protected HTable getHTable() { + return this.table; + } + + protected void setHTable(HTable table) { + this.table = table; + } + + public Scan getScan() { + if (this.scan == null) + this.scan = new Scan(); + return scan; + } + + public void setScan(Scan scan) { + this.scan = scan; + } + + protected void setTableRecordReader(PivotRecordReader pivotRecordReader) { + this.pivotRecordReader = pivotRecordReader; + } + + @Override + public Configuration getConf() { + return conf; + } + + public void setConf(Configuration configuration) { + this.conf = configuration; + String tableName = conf.get(INPUT_TABLE); + try { + setHTable(new HTable(new HBaseConfiguration(conf), tableName)); + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + } + Scan scan = null; + try { + scan = convertStringToScan(conf.get(SCAN)); + } catch (IOException e) { + LOG.error("An error occurred.", e); + } + setScan(scan); + } + + public static String convertScanToString(Scan scan) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(out); + scan.write(dos); + return Base64.encodeBytes(out.toByteArray()); + } + + public static Scan convertStringToScan(String base64) throws IOException { + ByteArrayInputStream bis = new ByteArrayInputStream(Base64.decode(base64)); + DataInputStream dis = new DataInputStream(bis); + Scan scan = new Scan(); + scan.readFields(dis); + return scan; + } + +} Index: src/examples/org/apache/hama/examples/mapreduce/PivotMap.java =================================================================== --- src/examples/org/apache/hama/examples/mapreduce/PivotMap.java (리비전 0) +++ src/examples/org/apache/hama/examples/mapreduce/PivotMap.java (리비전 0) @@ -0,0 +1,29 @@ +package org.apache.hama.examples.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hama.io.Pair; + +public class PivotMap extends + Mapper { + private double max = 0; + private Pair pair = new Pair(0, 0); + private Pair dummyPair = new Pair(Integer.MAX_VALUE, Integer.MAX_VALUE); + private DoubleWritable dummyVal = new DoubleWritable(0.0); + + public void map(Pair key, DoubleWritable value, Context context) + throws IOException, InterruptedException { + if (key.getRow() != Integer.MAX_VALUE) { + if (Math.abs(value.get()) > Math.abs(max)) { + pair.set(key.getRow(), key.getColumn()); + max = value.get(); + } + } else { + context.write(pair, new DoubleWritable(max)); + context.write(dummyPair, dummyVal); + } + } + +} Index: src/examples/org/apache/hama/examples/mapreduce/RotationInputFormat.java =================================================================== --- src/examples/org/apache/hama/examples/mapreduce/RotationInputFormat.java (리비전 0) +++ src/examples/org/apache/hama/examples/mapreduce/RotationInputFormat.java (리비전 0) @@ -0,0 +1,355 @@ +package org.apache.hama.examples.mapreduce; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mapreduce.TableSplit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.StringUtils; +import org.apache.hama.Constants; +import org.apache.hama.examples.JacobiEigen; +import org.apache.hama.util.BytesUtil; + +public class RotationInputFormat extends + InputFormat implements Configurable { + final static Log LOG = LogFactory.getLog(RotationInputFormat.class); + /** Job parameter that specifies the output table. */ + public static final String INPUT_TABLE = "hama.mapreduce.inputtable"; + /** Space delimited list of columns. */ + public static final String SCAN = "hama.mapreduce.scan"; + + /** The configuration. */ + private Configuration conf = null; + + /** Holds the details for the internal scanner. */ + private Scan scan = null; + /** The table to scan. */ + private HTable table = null; + /** The reader scanning the table, can be a custom one. */ + private RotationRecordReader rotationRecordReader; + + int pivot_row, pivot_col; + double pivot_cos, pivot_sin; + + protected static class RotationRecordReader extends + RecordReader { + private ResultScanner scanner = null; + private Scan scan = null; + private HTable htable = null; + private byte[] lastRow = null; + + private int totalRows; + private int processedRows; + int startRowId, endRowId = -1; + int size; + + int pivotrow, pivotcol; + byte[] prow, pcol; + double pivotcos, pivotsin; + + public RotationRecordReader(int pr, int pc, double psin, double pcos) { + super(); + pivotrow = pr; + pivotcol = pc; + pivotsin = psin; + pivotcos = pcos; + prow = Bytes.toBytes(pivotrow); + pcol = Bytes.toBytes(pivotcol); + LOG.info(prow); + LOG.info(pcol); + } + + public void setScan(Scan scan) { + this.scan = scan; + } + + public void setHTable(HTable htable) { + this.htable = htable; + } + + public void init() throws IOException { + restart(scan.getStartRow()); + byte[] startRow = scan.getStartRow(); + byte[] endRow = scan.getStopRow(); + + Get get = new Get(Bytes.toBytes(Constants.METADATA)); + get.addFamily(Constants.ATTRIBUTE); + byte[] result = htable.get(get).getValue(Constants.ATTRIBUTE, + Bytes.toBytes("rows")); + + size = (result != null) ? Bytes.toInt(result) : 0; + + if (endRow.length == 0) { // the last split, we don't know the end row + totalRows = 0; // so we just skip it. + if (startRow.length == 0) + startRowId = 0; + else + startRowId = BytesUtil.getRowIndex(startRow); + endRowId = -1; + } else { + if (startRow.length == 0) { // the first split, start row is 0 + totalRows = BytesUtil.getRowIndex(endRow); + startRowId = 0; + endRowId = totalRows; + } else { + startRowId = BytesUtil.getRowIndex(startRow); + endRowId = BytesUtil.getRowIndex(endRow); + totalRows = startRowId - endRowId; + } + } + processedRows = 0; + LOG.info("Split (" + startRowId + ", " + endRowId + ") -> " + totalRows); + } + + public void restart(byte[] firstRow) throws IOException { + Scan newScan = new Scan(scan); + newScan.setStartRow(firstRow); + this.scanner = this.htable.getScanner(newScan); + } + + @Override + public void close() throws IOException { + this.scanner.close(); + } + + @Override + public NullWritable getCurrentKey() throws IOException, + InterruptedException { + return NullWritable.get(); + } + + @Override + public NullWritable getCurrentValue() throws IOException, + InterruptedException { + return NullWritable.get(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + if (totalRows <= 0) { + return 0; + } else { + return Math.min(1.0f, processedRows / (float) totalRows); + } + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + Result vv; + try { + vv = this.scanner.next(); + } catch (IOException e) { + LOG.debug("recovered from " + StringUtils.stringifyException(e)); + restart(lastRow); + scanner.next(); // skip presumed already mapped row + vv = scanner.next(); + } + + double s1, s2; + Put put; + + boolean hasMore = vv != null && vv.size() > 0; + if (hasMore) { + byte[] row = vv.getRow(); + int rowId = BytesUtil.getRowIndex(row); + if (rowId < pivotrow) { + Get get = new Get(BytesUtil.getRowIndex(rowId)); + s1 = Bytes.toDouble(htable.get(get).getValue( + Bytes.toBytes(JacobiEigen.EICOL), + Bytes.toBytes(String.valueOf(pivotrow)))); + s2 = Bytes.toDouble(htable.get(get).getValue( + Bytes.toBytes(JacobiEigen.EICOL), + Bytes.toBytes(String.valueOf(pivotcol)))); + + put = new Put(BytesUtil.getRowIndex(rowId)); + put.add(Bytes.toBytes(JacobiEigen.EICOL), Bytes.toBytes(String + .valueOf(pivotrow)), Bytes.toBytes(new Double(pivotcos * s1 + - pivotsin * s2))); + put.add(Bytes.toBytes(JacobiEigen.EICOL), Bytes.toBytes(String + .valueOf(pivotcol)), Bytes.toBytes(new Double(pivotsin * s1 + + pivotcos * s2))); + + htable.put(put); + } else if (rowId == pivotrow) { + return true; + } else if (rowId < pivotcol) { + Get get = new Get(BytesUtil.getRowIndex(pivotrow)); + s1 = Bytes.toDouble(htable.get(get).getValue( + Bytes.toBytes(JacobiEigen.EICOL), + Bytes.toBytes(String.valueOf(rowId)))); + get = new Get(BytesUtil.getRowIndex(rowId)); + + s2 = Bytes.toDouble(htable.get(get).getValue( + Bytes.toBytes(JacobiEigen.EICOL), + Bytes.toBytes(String.valueOf(pivotcol)))); + + put = new Put(BytesUtil.getRowIndex(rowId)); + put.add(Bytes.toBytes(JacobiEigen.EICOL), Bytes.toBytes(String + .valueOf(pivotcol)), Bytes.toBytes(new Double(pivotsin * s1 + + pivotcos * s2))); + htable.put(put); + + put = new Put(BytesUtil.getRowIndex(pivotrow)); + put.add(Bytes.toBytes(JacobiEigen.EICOL), Bytes.toBytes(String + .valueOf(rowId)), Bytes.toBytes(new Double(pivotcos * s1 + - pivotsin * s2))); + htable.put(put); + + } else if (rowId == pivotcol) { + for (int i = pivotcol + 1; i < size; i++) { + Get get = new Get(BytesUtil.getRowIndex(pivotrow)); + + s1 = Bytes.toDouble(htable.get(get).getValue( + Bytes.toBytes(JacobiEigen.EICOL), + Bytes.toBytes(String.valueOf(i)))); + + get = new Get(BytesUtil.getRowIndex(pivotcol)); + s2 = Bytes.toDouble(htable.get(get).getValue( + Bytes.toBytes(JacobiEigen.EICOL), + Bytes.toBytes(String.valueOf(i)))); + + double pivotC = (pivotsin * s1 + pivotcos * s2); + put = new Put(BytesUtil.getRowIndex(pivotcol)); + put.add(Bytes.toBytes(JacobiEigen.EICOL), Bytes.toBytes(String + .valueOf(i)), Bytes.toBytes(pivotC)); + htable.put(put); + + double pivotV = (pivotcos * s1 - pivotsin * s2); + put = new Put(BytesUtil.getRowIndex(pivotrow)); + put.add(Bytes.toBytes(JacobiEigen.EICOL), Bytes.toBytes(String + .valueOf(i)), Bytes.toBytes(pivotV)); + htable.put(put); + + } + } else { // rowId > pivotcol + return false; + } + + lastRow = row; + } + return hasMore; + } + + } + + @Override + public RecordReader createRecordReader( + InputSplit split, TaskAttemptContext context) throws IOException, + InterruptedException { + TableSplit tSplit = (TableSplit) split; + RotationRecordReader trr = this.rotationRecordReader; + // if no table record reader was provided use default + if (trr == null) { + trr = new RotationRecordReader(pivot_row, pivot_col, pivot_sin, pivot_cos); + } + Scan sc = new Scan(this.scan); + sc.setStartRow(tSplit.getStartRow()); + sc.setStopRow(tSplit.getEndRow()); + trr.setScan(sc); + trr.setHTable(table); + trr.init(); + return trr; + } + + @Override + public List getSplits(JobContext context) throws IOException { + if (table == null) { + throw new IOException("No table was provided."); + } + byte[][] startKeys = table.getStartKeys(); + if (startKeys == null || startKeys.length == 0) { + throw new IOException("Expecting at least one region."); + } + int realNumSplits = startKeys.length; + InputSplit[] splits = new InputSplit[realNumSplits]; + int middle = startKeys.length / realNumSplits; + int startPos = 0; + for (int i = 0; i < realNumSplits; i++) { + int lastPos = startPos + middle; + lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos; + String regionLocation = table.getRegionLocation(startKeys[startPos]) + .getServerAddress().getHostname(); + splits[i] = new TableSplit(this.table.getTableName(), + startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos] + : HConstants.EMPTY_START_ROW, regionLocation); + LOG.info("split: " + i + "->" + splits[i]); + startPos = lastPos; + } + return Arrays.asList(splits); + } + + protected HTable getHTable() { + return this.table; + } + + protected void setHTable(HTable table) { + this.table = table; + } + + public Scan getScan() { + if (this.scan == null) + this.scan = new Scan(); + return scan; + } + + public void setScan(Scan scan) { + this.scan = scan; + } + + protected void setTableRecordReader(RotationRecordReader rotationRecordReader) { + this.rotationRecordReader = rotationRecordReader; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + pivot_row = conf.getInt(JacobiEigen.PIVOTROW, -1); + pivot_col = conf.getInt(JacobiEigen.PIVOTCOL, -1); + pivot_sin = Double.parseDouble(conf.get(JacobiEigen.PIVOTSIN)); + pivot_cos = Double.parseDouble(conf.get(JacobiEigen.PIVOTCOS)); + + this.conf = conf; + String tableName = conf.get(INPUT_TABLE); + try { + setHTable(new HTable(new HBaseConfiguration(conf), tableName)); + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + } + Scan scan = null; + try { + scan = PivotInputFormat.convertStringToScan(conf.get(SCAN)); + } catch (IOException e) { + LOG.error("An error occurred.", e); + } + setScan(scan); + } + +} Index: src/java/org/apache/hama/Constants.java =================================================================== --- src/java/org/apache/hama/Constants.java (리비전 942971) +++ src/java/org/apache/hama/Constants.java (작업 사본) @@ -85,28 +85,4 @@ public static final String BLOCK = "block"; public static final Text ROWCOUNT= new Text("row"); - - /** - * EigenValue Constants - */ - /** a matrix copy of the original copy collected in "eicol" family * */ - public static final String EICOL = "eicol"; - - /** a column family collect all values and statuses used during computation * */ - public static final String EI = "eival"; - public static final String EIVAL = "value"; - public static final String EICHANGED = "changed"; - - /** a column identify the index of the max absolute value each row * */ - public static final String EIIND = "ind"; - - /** a matrix collect all the eigen vectors * */ - public static final String EIVEC = "eivec"; - public static final String MATRIX = "hama.jacobieigenvalue.matrix"; - - /** parameters for pivot * */ - public static final String PIVOTROW = "hama.jacobi.pivot.row"; - public static final String PIVOTCOL = "hama.jacobi.pivot.col"; - public static final String PIVOTSIN = "hama.jacobi.pivot.sin"; - public static final String PIVOTCOS = "hama.jacobi.pivot.cos"; } Index: src/java/org/apache/hama/mapreduce/DummyMapper.java =================================================================== --- src/java/org/apache/hama/mapreduce/DummyMapper.java (리비전 942971) +++ src/java/org/apache/hama/mapreduce/DummyMapper.java (작업 사본) @@ -1,15 +0,0 @@ -package org.apache.hama.mapreduce; - -import java.io.IOException; - -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapreduce.Mapper; - -public class DummyMapper extends Mapper { - /** The dummy function. */ - public void map(K key, V val, OutputCollector output, Reporter reporter) - throws IOException { - // do nothing - } -} Index: src/java/org/apache/hama/mapreduce/PivotInputFormat.java =================================================================== --- src/java/org/apache/hama/mapreduce/PivotInputFormat.java (리비전 942971) +++ src/java/org/apache/hama/mapreduce/PivotInputFormat.java (작업 사본) @@ -1,286 +0,0 @@ -package org.apache.hama.mapreduce; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.mapreduce.TableSplit; -import org.apache.hadoop.hbase.util.Base64; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.util.StringUtils; -import org.apache.hama.Constants; -import org.apache.hama.io.Pair; -import org.apache.hama.util.BytesUtil; - -public class PivotInputFormat extends InputFormat - implements Configurable { - final static Log LOG = LogFactory.getLog(PivotInputFormat.class); - - /** Job parameter that specifies the output table. */ - public static final String INPUT_TABLE = "hama.mapreduce.inputtable"; - /** Space delimited list of columns. */ - public static final String SCAN = "hama.mapreduce.scan"; - - /** The configuration. */ - private Configuration conf = null; - - /** Holds the details for the internal scanner. */ - private Scan scan = null; - /** The table to scan. */ - private HTable table = null; - /** The reader scanning the table, can be a custom one. */ - private PivotRecordReader pivotRecordReader = null; - - @Override - public List getSplits(JobContext context) throws IOException { - if (table == null) { - throw new IOException("No table was provided."); - } - byte[][] startKeys = table.getStartKeys(); - if (startKeys == null || startKeys.length == 0) { - throw new IOException("Expecting at least one region."); - } - int realNumSplits = startKeys.length; - InputSplit[] splits = new InputSplit[realNumSplits]; - int middle = startKeys.length / realNumSplits; - int startPos = 0; - for (int i = 0; i < realNumSplits; i++) { - int lastPos = startPos + middle; - lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos; - String regionLocation = table.getRegionLocation(startKeys[startPos]) - .getServerAddress().getHostname(); - splits[i] = new TableSplit(this.table.getTableName(), - startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos] - : HConstants.EMPTY_START_ROW, regionLocation); - LOG.info("split: " + i + "->" + splits[i]); - startPos = lastPos; - } - return Arrays.asList(splits); - } - - protected static class PivotRecordReader extends - RecordReader { - private int totalRows; - private int processedRows; - private int size; - boolean mocked = true; - - private ResultScanner scanner = null; - private Scan scan = null; - private HTable htable = null; - private byte[] lastRow = null; - private Pair key = null; - private DoubleWritable value = null; - - @Override - public void close() { - this.scanner.close(); - } - - public void setScan(Scan scan) { - this.scan = scan; - } - - public void setHTable(HTable htable) { - this.htable = htable; - } - - public void init() throws IOException { - restart(scan.getStartRow()); - } - - public void restart(byte[] firstRow) throws IOException { - Scan newScan = new Scan(scan); - newScan.setStartRow(firstRow); - this.scanner = this.htable.getScanner(newScan); - } - - @Override - public Pair getCurrentKey() throws IOException, InterruptedException { - return key; - } - - @Override - public DoubleWritable getCurrentValue() throws IOException, - InterruptedException { - return value; - } - - @Override - public float getProgress() throws IOException, InterruptedException { - if (totalRows <= 0) { - return 0; - } else { - return Math.min(1.0f, processedRows / (float) totalRows); - } - } - - @Override - public void initialize(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - if (key == null) - key = new Pair(); - if (value == null) - value = new DoubleWritable(); - - Result vv; - try { - vv = this.scanner.next(); - } catch (IOException e) { - LOG.debug("recovered from " + StringUtils.stringifyException(e)); - restart(lastRow); - scanner.next(); // skip presumed already mapped row - vv = scanner.next(); - } - - boolean hasMore = vv != null && vv.size() > 0; - if (hasMore) { - - byte[] row = vv.getRow(); - - int rowId = BytesUtil.getRowIndex(row); - if (rowId == size - 1) { // skip the last row - if (mocked) { - key.set(Integer.MAX_VALUE, Integer.MAX_VALUE); - mocked = false; - return true; - } else { - return false; - } - } - - byte[] col = vv.getValue(Bytes - .toBytes(Constants.EI), Bytes - .toBytes(Constants.EIIND)); - int colId = BytesUtil.bytesToInt(col); - double val = 0; - - Get get = new Get(BytesUtil.getRowIndex(rowId)); - byte[] cell = htable.get(get).getValue( - Bytes.toBytes(Constants.EICOL), - Bytes.toBytes(String.valueOf(colId))); - if (cell != null) { - val = Bytes.toDouble(cell); - } - - key.set(rowId, colId); - value.set(val); - - lastRow = row; - } else { - if (mocked) { - key.set(Integer.MAX_VALUE, Integer.MAX_VALUE); - mocked = false; - return true; - } else { - return false; - } - } - - return hasMore; - } - } - - @Override - public RecordReader createRecordReader( - InputSplit split, TaskAttemptContext context) throws IOException { - TableSplit tSplit = (TableSplit) split; - PivotRecordReader trr = this.pivotRecordReader; - // if no table record reader was provided use default - if (trr == null) { - trr = new PivotRecordReader(); - } - Scan sc = new Scan(this.scan); - sc.setStartRow(tSplit.getStartRow()); - sc.setStopRow(tSplit.getEndRow()); - trr.setScan(sc); - trr.setHTable(table); - trr.init(); - return trr; - } - - protected HTable getHTable() { - return this.table; - } - - protected void setHTable(HTable table) { - this.table = table; - } - - public Scan getScan() { - if (this.scan == null) - this.scan = new Scan(); - return scan; - } - - public void setScan(Scan scan) { - this.scan = scan; - } - - protected void setTableRecordReader(PivotRecordReader pivotRecordReader) { - this.pivotRecordReader = pivotRecordReader; - } - - @Override - public Configuration getConf() { - return conf; - } - - public void setConf(Configuration configuration) { - this.conf = configuration; - String tableName = conf.get(INPUT_TABLE); - try { - setHTable(new HTable(new HBaseConfiguration(conf), tableName)); - } catch (Exception e) { - LOG.error(StringUtils.stringifyException(e)); - } - Scan scan = null; - try { - scan = convertStringToScan(conf.get(SCAN)); - } catch (IOException e) { - LOG.error("An error occurred.", e); - } - setScan(scan); - } - - public static String convertScanToString(Scan scan) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(out); - scan.write(dos); - return Base64.encodeBytes(out.toByteArray()); - } - - public static Scan convertStringToScan(String base64) throws IOException { - ByteArrayInputStream bis = new ByteArrayInputStream(Base64.decode(base64)); - DataInputStream dis = new DataInputStream(bis); - Scan scan = new Scan(); - scan.readFields(dis); - return scan; - } - -} Index: src/java/org/apache/hama/mapreduce/RotationInputFormat.java =================================================================== --- src/java/org/apache/hama/mapreduce/RotationInputFormat.java (리비전 942971) +++ src/java/org/apache/hama/mapreduce/RotationInputFormat.java (작업 사본) @@ -1,354 +0,0 @@ -package org.apache.hama.mapreduce; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.mapreduce.TableSplit; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.util.StringUtils; -import org.apache.hama.Constants; -import org.apache.hama.util.BytesUtil; - -public class RotationInputFormat extends - InputFormat implements Configurable { - final static Log LOG = LogFactory.getLog(RotationInputFormat.class); - /** Job parameter that specifies the output table. */ - public static final String INPUT_TABLE = "hama.mapreduce.inputtable"; - /** Space delimited list of columns. */ - public static final String SCAN = "hama.mapreduce.scan"; - - /** The configuration. */ - private Configuration conf = null; - - /** Holds the details for the internal scanner. */ - private Scan scan = null; - /** The table to scan. */ - private HTable table = null; - /** The reader scanning the table, can be a custom one. */ - private RotationRecordReader rotationRecordReader; - - int pivot_row, pivot_col; - double pivot_cos, pivot_sin; - - protected static class RotationRecordReader extends - RecordReader { - private ResultScanner scanner = null; - private Scan scan = null; - private HTable htable = null; - private byte[] lastRow = null; - - private int totalRows; - private int processedRows; - int startRowId, endRowId = -1; - int size; - - int pivotrow, pivotcol; - byte[] prow, pcol; - double pivotcos, pivotsin; - - public RotationRecordReader(int pr, int pc, double psin, double pcos) { - super(); - pivotrow = pr; - pivotcol = pc; - pivotsin = psin; - pivotcos = pcos; - prow = Bytes.toBytes(pivotrow); - pcol = Bytes.toBytes(pivotcol); - LOG.info(prow); - LOG.info(pcol); - } - - public void setScan(Scan scan) { - this.scan = scan; - } - - public void setHTable(HTable htable) { - this.htable = htable; - } - - public void init() throws IOException { - restart(scan.getStartRow()); - byte[] startRow = scan.getStartRow(); - byte[] endRow = scan.getStopRow(); - - Get get = new Get(Bytes.toBytes(Constants.METADATA)); - get.addFamily(Constants.ATTRIBUTE); - byte[] result = htable.get(get).getValue(Constants.ATTRIBUTE, - Bytes.toBytes("rows")); - - size = (result != null) ? Bytes.toInt(result) : 0; - - if (endRow.length == 0) { // the last split, we don't know the end row - totalRows = 0; // so we just skip it. - if (startRow.length == 0) - startRowId = 0; - else - startRowId = BytesUtil.getRowIndex(startRow); - endRowId = -1; - } else { - if (startRow.length == 0) { // the first split, start row is 0 - totalRows = BytesUtil.getRowIndex(endRow); - startRowId = 0; - endRowId = totalRows; - } else { - startRowId = BytesUtil.getRowIndex(startRow); - endRowId = BytesUtil.getRowIndex(endRow); - totalRows = startRowId - endRowId; - } - } - processedRows = 0; - LOG.info("Split (" + startRowId + ", " + endRowId + ") -> " + totalRows); - } - - public void restart(byte[] firstRow) throws IOException { - Scan newScan = new Scan(scan); - newScan.setStartRow(firstRow); - this.scanner = this.htable.getScanner(newScan); - } - - @Override - public void close() throws IOException { - this.scanner.close(); - } - - @Override - public NullWritable getCurrentKey() throws IOException, - InterruptedException { - return NullWritable.get(); - } - - @Override - public NullWritable getCurrentValue() throws IOException, - InterruptedException { - return NullWritable.get(); - } - - @Override - public float getProgress() throws IOException, InterruptedException { - if (totalRows <= 0) { - return 0; - } else { - return Math.min(1.0f, processedRows / (float) totalRows); - } - } - - @Override - public void initialize(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - Result vv; - try { - vv = this.scanner.next(); - } catch (IOException e) { - LOG.debug("recovered from " + StringUtils.stringifyException(e)); - restart(lastRow); - scanner.next(); // skip presumed already mapped row - vv = scanner.next(); - } - - double s1, s2; - Put put; - - boolean hasMore = vv != null && vv.size() > 0; - if (hasMore) { - byte[] row = vv.getRow(); - int rowId = BytesUtil.getRowIndex(row); - if (rowId < pivotrow) { - Get get = new Get(BytesUtil.getRowIndex(rowId)); - s1 = Bytes.toDouble(htable.get(get).getValue( - Bytes.toBytes(Constants.EICOL), - Bytes.toBytes(String.valueOf(pivotrow)))); - s2 = Bytes.toDouble(htable.get(get).getValue( - Bytes.toBytes(Constants.EICOL), - Bytes.toBytes(String.valueOf(pivotcol)))); - - put = new Put(BytesUtil.getRowIndex(rowId)); - put.add(Bytes.toBytes(Constants.EICOL), Bytes.toBytes(String - .valueOf(pivotrow)), Bytes.toBytes(new Double(pivotcos * s1 - - pivotsin * s2))); - put.add(Bytes.toBytes(Constants.EICOL), Bytes.toBytes(String - .valueOf(pivotcol)), Bytes.toBytes(new Double(pivotsin * s1 - + pivotcos * s2))); - - htable.put(put); - } else if (rowId == pivotrow) { - return true; - } else if (rowId < pivotcol) { - Get get = new Get(BytesUtil.getRowIndex(pivotrow)); - s1 = Bytes.toDouble(htable.get(get).getValue( - Bytes.toBytes(Constants.EICOL), - Bytes.toBytes(String.valueOf(rowId)))); - get = new Get(BytesUtil.getRowIndex(rowId)); - - s2 = Bytes.toDouble(htable.get(get).getValue( - Bytes.toBytes(Constants.EICOL), - Bytes.toBytes(String.valueOf(pivotcol)))); - - put = new Put(BytesUtil.getRowIndex(rowId)); - put.add(Bytes.toBytes(Constants.EICOL), Bytes.toBytes(String - .valueOf(pivotcol)), Bytes.toBytes(new Double(pivotsin * s1 - + pivotcos * s2))); - htable.put(put); - - put = new Put(BytesUtil.getRowIndex(pivotrow)); - put.add(Bytes.toBytes(Constants.EICOL), Bytes.toBytes(String - .valueOf(rowId)), Bytes.toBytes(new Double(pivotcos * s1 - - pivotsin * s2))); - htable.put(put); - - } else if (rowId == pivotcol) { - for (int i = pivotcol + 1; i < size; i++) { - Get get = new Get(BytesUtil.getRowIndex(pivotrow)); - - s1 = Bytes.toDouble(htable.get(get).getValue( - Bytes.toBytes(Constants.EICOL), - Bytes.toBytes(String.valueOf(i)))); - - get = new Get(BytesUtil.getRowIndex(pivotcol)); - s2 = Bytes.toDouble(htable.get(get).getValue( - Bytes.toBytes(Constants.EICOL), - Bytes.toBytes(String.valueOf(i)))); - - put = new Put(BytesUtil.getRowIndex(pivotcol)); - put.add(Bytes.toBytes(Constants.EICOL), Bytes.toBytes(String - .valueOf(i)), Bytes.toBytes(new Double(pivotsin * s1 + pivotcos - * s2))); - htable.put(put); - - put = new Put(BytesUtil.getRowIndex(pivotrow)); - put.add(Bytes.toBytes(Constants.EICOL), Bytes.toBytes(String - .valueOf(i)), Bytes.toBytes(new Double(pivotcos * s1 - pivotsin - * s2))); - htable.put(put); - - } - } else { // rowId > pivotcol - return false; - } - - lastRow = row; - } - return hasMore; - } - - } - - @Override - public RecordReader createRecordReader( - InputSplit split, TaskAttemptContext context) throws IOException, - InterruptedException { - TableSplit tSplit = (TableSplit) split; - RotationRecordReader trr = this.rotationRecordReader; - // if no table record reader was provided use default - if (trr == null) { - trr = new RotationRecordReader(pivot_row, pivot_col, pivot_sin, pivot_cos); - } - Scan sc = new Scan(this.scan); - sc.setStartRow(tSplit.getStartRow()); - sc.setStopRow(tSplit.getEndRow()); - trr.setScan(sc); - trr.setHTable(table); - trr.init(); - return trr; - } - - @Override - public List getSplits(JobContext context) throws IOException { - if (table == null) { - throw new IOException("No table was provided."); - } - byte[][] startKeys = table.getStartKeys(); - if (startKeys == null || startKeys.length == 0) { - throw new IOException("Expecting at least one region."); - } - int realNumSplits = startKeys.length; - InputSplit[] splits = new InputSplit[realNumSplits]; - int middle = startKeys.length / realNumSplits; - int startPos = 0; - for (int i = 0; i < realNumSplits; i++) { - int lastPos = startPos + middle; - lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos; - String regionLocation = table.getRegionLocation(startKeys[startPos]) - .getServerAddress().getHostname(); - splits[i] = new TableSplit(this.table.getTableName(), - startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos] - : HConstants.EMPTY_START_ROW, regionLocation); - LOG.info("split: " + i + "->" + splits[i]); - startPos = lastPos; - } - return Arrays.asList(splits); - } - - protected HTable getHTable() { - return this.table; - } - - protected void setHTable(HTable table) { - this.table = table; - } - - public Scan getScan() { - if (this.scan == null) - this.scan = new Scan(); - return scan; - } - - public void setScan(Scan scan) { - this.scan = scan; - } - - protected void setTableRecordReader(RotationRecordReader rotationRecordReader) { - this.rotationRecordReader = rotationRecordReader; - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration conf) { - pivot_row = conf.getInt(Constants.PIVOTROW, -1); - pivot_col = conf.getInt(Constants.PIVOTCOL, -1); - pivot_sin = Double.parseDouble(conf.get(Constants.PIVOTSIN)); - pivot_cos = Double.parseDouble(conf.get(Constants.PIVOTCOS)); - - this.conf = conf; - String tableName = conf.get(INPUT_TABLE); - try { - setHTable(new HTable(new HBaseConfiguration(conf), tableName)); - } catch (Exception e) { - LOG.error(StringUtils.stringifyException(e)); - } - Scan scan = null; - try { - scan = PivotInputFormat.convertStringToScan(conf.get(SCAN)); - } catch (IOException e) { - LOG.error("An error occurred.", e); - } - setScan(scan); - } - -} Index: src/java/org/apache/hama/matrix/AbstractMatrix.java =================================================================== --- src/java/org/apache/hama/matrix/AbstractMatrix.java (리비전 942971) +++ src/java/org/apache/hama/matrix/AbstractMatrix.java (작업 사본) @@ -71,6 +71,13 @@ protected boolean closed = true; + /** a matrix copy of the original copy collected in "eicol" family * */ + public static final String EICOL = "eicol"; + /** a column family collect all values and statuses used during computation * */ + public static final String EI = "eival"; + /** a matrix collect all the eigen vectors * */ + public static final String EIVEC = "eivec"; + /** * Sets the job configuration * @@ -131,11 +138,11 @@ .toBytes(Constants.BLOCK))); // the following families are used in JacobiEigenValue computation this.tableDesc.addFamily(new HColumnDescriptor(Bytes - .toBytes(Constants.EI))); + .toBytes(EI))); this.tableDesc.addFamily(new HColumnDescriptor(Bytes - .toBytes(Constants.EICOL))); + .toBytes(EICOL))); this.tableDesc.addFamily(new HColumnDescriptor(Bytes - .toBytes(Constants.EIVEC))); + .toBytes(EIVEC))); LOG.info("Initializing the matrix storage."); this.admin.createTable(this.tableDesc); @@ -289,9 +296,9 @@ scan.addFamily(Constants.ATTRIBUTE); scan.addFamily(Bytes.toBytes(Constants.ALIASEFAMILY)); scan.addFamily(Bytes.toBytes(Constants.BLOCK)); - scan.addFamily(Bytes.toBytes(Constants.EI)); - scan.addFamily(Bytes.toBytes(Constants.EICOL)); - scan.addFamily(Bytes.toBytes(Constants.EIVEC)); + scan.addFamily(Bytes.toBytes(EI)); + scan.addFamily(Bytes.toBytes(EICOL)); + scan.addFamily(Bytes.toBytes(EIVEC)); org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(B .getPath(), scan, ScanMapper.class, ImmutableBytesWritable.class, @@ -318,9 +325,9 @@ scan.addFamily(Constants.ATTRIBUTE); scan.addFamily(Bytes.toBytes(Constants.ALIASEFAMILY)); scan.addFamily(Bytes.toBytes(Constants.BLOCK)); - scan.addFamily(Bytes.toBytes(Constants.EI)); - scan.addFamily(Bytes.toBytes(Constants.EICOL)); - scan.addFamily(Bytes.toBytes(Constants.EIVEC)); + scan.addFamily(Bytes.toBytes(EI)); + scan.addFamily(Bytes.toBytes(EICOL)); + scan.addFamily(Bytes.toBytes(EIVEC)); Float f = new Float(alpha); job.getConfiguration().setFloat("set.alpha", f); Index: src/java/org/apache/hama/matrix/DenseMatrix.java =================================================================== --- src/java/org/apache/hama/matrix/DenseMatrix.java (리비전 942971) +++ src/java/org/apache/hama/matrix/DenseMatrix.java (작업 사본) @@ -23,37 +23,23 @@ import java.util.Iterator; import java.util.Map; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hama.Constants; import org.apache.hama.HamaConfiguration; -import org.apache.hama.io.Pair; -import org.apache.hama.mapreduce.DummyMapper; -import org.apache.hama.mapreduce.PivotInputFormat; -import org.apache.hama.mapreduce.RotationInputFormat; -import org.apache.hama.matrix.algebra.JacobiInitMap; import org.apache.hama.matrix.algebra.MatrixAdditionMap; import org.apache.hama.matrix.algebra.MatrixAdditionReduce; -import org.apache.hama.matrix.algebra.PivotMap; import org.apache.hama.util.BytesUtil; import org.apache.hama.util.RandomVariable; @@ -503,286 +489,4 @@ return result; } - - - /** - * Compute all the eigen values. Note: all the eigen values are collected in - * the "eival:value" column, and the eigen vector of a specified eigen value - * is collected in the "eivec:" column family in the same row. - * - * TODO: we may need to expose the interface to access the eigen values and - * vectors - * - * @param imax limit the loops of the computation - * @throws IOException - */ - public void jacobiEigenValue(int imax) throws IOException { - /* - * Initialization A M/R job is used for initialization(such as, preparing a - * matrx copy of the original in "eicol:" family.) - */ - // initialization - Job job = new Job(config, "JacobiEigen initialization MR job" + getPath()); - - Scan scan = new Scan(); - scan.addFamily(Constants.COLUMNFAMILY); - - TableMapReduceUtil.initTableMapperJob(getPath(), scan, JacobiInitMap.class, - ImmutableBytesWritable.class, Put.class, job); - TableMapReduceUtil.initTableReducerJob(getPath(), - IdentityTableReducer.class, job); - - try { - job.waitForCompletion(true); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - } - - final FileSystem fs = FileSystem.get(config); - Pair pivotPair = new Pair(); - DoubleWritable pivotWritable = new DoubleWritable(); - Put put; - - // loop - int size = this.getRows(); - int state = size; - int pivot_row, pivot_col; - double pivot; - double s, c, t, y; - - int icount = 0; - while (state != 0 && icount < imax) { - icount = icount + 1; - /* - * Find the pivot and its index(pivot_row, pivot_col) A M/R job is used to - * scan all the "eival:ind" to get the max absolute value of each row, and - * do a MAX aggregation of these max values to get the max value in the - * matrix. - */ - Path outDir = new Path(new Path(getType() + "_TMP_FindPivot_dir_" - + System.currentTimeMillis()), "out"); - if (fs.exists(outDir)) - fs.delete(outDir, true); - - job = new Job(config, "Find Pivot MR job" + getPath()); - - scan = new Scan(); - scan.addFamily(Bytes.toBytes(Constants.EI)); - - job.setInputFormatClass(PivotInputFormat.class); - job.setMapOutputKeyClass(Pair.class); - job.setMapOutputValueClass(DoubleWritable.class); - job.setMapperClass(PivotMap.class); - job.getConfiguration().set(PivotInputFormat.INPUT_TABLE, getPath()); - job.getConfiguration().set(PivotInputFormat.SCAN, - PivotInputFormat.convertScanToString(scan)); - - job.setOutputKeyClass(Pair.class); - job.setOutputValueClass(DoubleWritable.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - SequenceFileOutputFormat.setOutputPath(job, outDir); - - try { - job.waitForCompletion(true); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - } - - // read outputs - Path inFile = new Path(outDir, "part-r-00000"); - SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, config); - try { - reader.next(pivotPair, pivotWritable); - pivot_row = pivotPair.getRow(); - pivot_col = pivotPair.getColumn(); - pivot = pivotWritable.get(); - } finally { - reader.close(); - } - fs.delete(outDir, true); - fs.delete(outDir.getParent(), true); - - if (pivot_row == 0 && pivot_col == 0) - break; // stop the iterations - - /* - * Calculation Compute the rotation parameters of next rotation. - */ - Get get = new Get(BytesUtil.getRowIndex(pivot_row)); - get.addFamily(Bytes.toBytes(Constants.EI)); - Result r = table.get(get); - double e1 = Bytes.toDouble(r.getValue(Bytes.toBytes(Constants.EI), Bytes - .toBytes(Constants.EIVAL))); - - get = new Get(BytesUtil.getRowIndex(pivot_col)); - get.addFamily(Bytes.toBytes(Constants.EI)); - r = table.get(get); - double e2 = Bytes.toDouble(r.getValue(Bytes.toBytes(Constants.EI), Bytes - .toBytes(Constants.EIVAL))); - - y = (e2 - e1) / 2; - t = Math.abs(y) + Math.sqrt(pivot * pivot + y * y); - s = Math.sqrt(pivot * pivot + t * t); - c = t / s; - s = pivot / s; - t = (pivot * pivot) / t; - if (y < 0) { - s = -s; - t = -t; - } - - /* - * Upate the pivot and the eigen values indexed by the pivot - */ - put = new Put(BytesUtil.getRowIndex(pivot_row)); - put.add(Bytes.toBytes(Constants.EICOL), Bytes.toBytes(String.valueOf(pivot_col)), Bytes - .toBytes(0.0)); - table.put(put); - - state = update(pivot_row, -t, state); - state = update(pivot_col, t, state); - - /* - * Rotation the matrix - */ - job = new Job(config, "Rotation Matrix MR job" + getPath()); - - scan = new Scan(); - scan.addFamily(Bytes.toBytes(Constants.EI)); - - job.getConfiguration().setInt(Constants.PIVOTROW, pivot_row); - job.getConfiguration().setInt(Constants.PIVOTCOL, pivot_col); - job.getConfiguration().set(Constants.PIVOTSIN, String.valueOf(s)); - job.getConfiguration().set(Constants.PIVOTCOS, String.valueOf(c)); - - job.setInputFormatClass(RotationInputFormat.class); - job.setMapOutputKeyClass(NullWritable.class); - job.setMapOutputValueClass(NullWritable.class); - job.setMapperClass(DummyMapper.class); - job.getConfiguration().set(RotationInputFormat.INPUT_TABLE, getPath()); - job.getConfiguration().set(RotationInputFormat.SCAN, - PivotInputFormat.convertScanToString(scan)); - job.setOutputFormatClass(NullOutputFormat.class); - - try { - job.waitForCompletion(true); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - } - - // rotate eigenvectors - LOG.info("rotating eigenvector"); - for (int i = 0; i < size; i++) { - get = new Get(BytesUtil.getRowIndex(pivot_row)); - e1 = Bytes.toDouble(table.get(get).getValue( - Bytes.toBytes(Constants.EIVEC), Bytes.toBytes(String.valueOf(i)))); - - get = new Get(BytesUtil.getRowIndex(pivot_col)); - e2 = Bytes.toDouble(table.get(get).getValue( - Bytes.toBytes(Constants.EIVEC), Bytes.toBytes(String.valueOf(i)))); - - put = new Put(BytesUtil.getRowIndex(pivot_row)); - put.add(Bytes.toBytes(Constants.EIVEC), Bytes.toBytes(String.valueOf(i)), Bytes - .toBytes(new Double(c * e1 - s * e2))); - table.put(put); - - put = new Put(BytesUtil.getRowIndex(pivot_col)); - put.add(Bytes.toBytes(Constants.EIVEC), Bytes.toBytes(String.valueOf(i)), Bytes - .toBytes(new Double(s * e1 + c * e2))); - table.put(put); - } - - LOG.info("update index..."); - // update index array - maxind(pivot_row, size); - maxind(pivot_col, size); - } - } - - void maxind(int row, int size) throws IOException { - int m = row + 1; - Get get = null; - if (row + 2 < size) { - get = new Get(BytesUtil.getRowIndex(row)); - - double max = Bytes.toDouble(table.get(get).getValue( - Bytes.toBytes(Constants.EICOL), Bytes.toBytes(String.valueOf(m)))); - double val; - for (int i = row + 2; i < size; i++) { - get = new Get(BytesUtil.getRowIndex(row)); - val = Bytes.toDouble(table.get(get).getValue( - Bytes.toBytes(Constants.EICOL), Bytes.toBytes(String.valueOf(i)))); - if (Math.abs(val) > Math.abs(max)) { - m = i; - max = val; - } - } - } - - Put put = new Put(BytesUtil.getRowIndex(row)); - put.add(Bytes.toBytes(Constants.EI), Bytes.toBytes("ind"), Bytes - .toBytes(String.valueOf(m))); - table.put(put); - } - - int update(int row, double value, int state) throws IOException { - Get get = new Get(BytesUtil.getRowIndex(row)); - double e = Bytes.toDouble(table.get(get).getValue( - Bytes.toBytes(Constants.EI), Bytes.toBytes(Constants.EIVAL))); - int changed = BytesUtil.bytesToInt(table.get(get).getValue( - Bytes.toBytes(Constants.EI), Bytes.toBytes("changed"))); - double y = e; - e += value; - - Put put = new Put(BytesUtil.getRowIndex(row)); - put.add(Bytes.toBytes(Constants.EI), Bytes.toBytes(Constants.EIVAL), Bytes - .toBytes(e)); - - if (changed == 1 && (Math.abs(y - e) < .0000001)) { // y == e) { - changed = 0; - put.add(Bytes.toBytes(Constants.EI), Bytes.toBytes(Constants.EICHANGED), Bytes - .toBytes(String.valueOf(changed))); - - state--; - } else if (changed == 0 && (Math.abs(y - e) > .0000001)) { - changed = 1; - put.add(Bytes.toBytes(Constants.EI), Bytes.toBytes(Constants.EICHANGED), Bytes - .toBytes(String.valueOf(changed))); - - state++; - } - table.put(put); - return state; - } - - // for test - boolean verifyEigenValue(double[] e, double[][] E) throws IOException { - boolean success = true; - double e1, ev; - Get get = null; - for (int i = 0; i < e.length; i++) { - get = new Get(BytesUtil.getRowIndex(i)); - e1 = Bytes.toDouble(table.get(get).getValue(Bytes.toBytes(Constants.EI), - Bytes.toBytes(Constants.EIVAL))); - success &= ((Math.abs(e1 - e[i]) < .0000001)); - if (!success) - return success; - - for (int j = 0; j < E[i].length; j++) { - get = new Get(BytesUtil.getRowIndex(i)); - ev = Bytes.toDouble(table.get(get).getValue( - Bytes.toBytes(Constants.EIVEC), Bytes.toBytes(String.valueOf(j)))); - success &= ((Math.abs(ev - E[i][j]) < .0000001)); - if (!success) - return success; - } - } - return success; - } } Index: src/java/org/apache/hama/matrix/algebra/JacobiInitMap.java =================================================================== --- src/java/org/apache/hama/matrix/algebra/JacobiInitMap.java (리비전 942971) +++ src/java/org/apache/hama/matrix/algebra/JacobiInitMap.java (작업 사본) @@ -1,82 +0,0 @@ -package org.apache.hama.matrix.algebra; - -import java.io.IOException; -import java.util.Map; -import java.util.NavigableMap; - -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableMapper; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hama.Constants; -import org.apache.hama.util.BytesUtil; - -/** - * The matrix will be modified during computing eigen value. So a new matrix - * will be created to prevent the original matrix being modified. To reduce the - * network transfer, we copy the "column" family in the original matrix to a - * "eicol" family. All the following modification will be done over "eicol" - * family. - * - * And the output Eigen Vector Arrays "eivec", and the output eigen value array - * "eival:value", and the temp status array "eival:changed", "eival:ind" will be - * created. - * - * Also "eival:state" will record the state of the rotation state of a matrix - */ -public class JacobiInitMap extends TableMapper { - - public void map(ImmutableBytesWritable key, Result value, Context context) - throws IOException, InterruptedException { - int row, col; - row = BytesUtil.getRowIndex(key.get()); - Put put = new Put(BytesUtil.getRowIndex(row)); - - double val; - double maxVal = Double.MIN_VALUE; - int maxInd = row + 1; - - boolean init = true; - - NavigableMap map = value - .getFamilyMap(Constants.COLUMNFAMILY); - for (Map.Entry e : map.entrySet()) { - val = Bytes.toDouble(e.getValue()); - col = BytesUtil.bytesToInt(e.getKey()); - // copy the original matrix to "EICOL" family - put.add(Bytes.toBytes(Constants.EICOL), Bytes.toBytes(String.valueOf(col)), Bytes - .toBytes(val)); - // make the "EIVEC" a dialog matrix - put.add(Bytes.toBytes(Constants.EIVEC), Bytes.toBytes(String.valueOf(col)), Bytes - .toBytes(col == row ? new Double(1) : new Double(0))); - - if (col == row) { - put.add(Bytes.toBytes(Constants.EI), Bytes.toBytes(Constants.EIVAL), Bytes - .toBytes(val)); - } - // find the max index - if (col > row) { - if (init) { - maxInd = col; - maxVal = val; - init = false; - } else { - if (Math.abs(val) > Math.abs(maxVal)) { - maxVal = val; - maxInd = col; - } - } - } - } - - // index array - put.add(Bytes.toBytes(Constants.EI), Bytes.toBytes(Constants.EIIND), Bytes - .toBytes(String.valueOf(maxInd))); - // Changed Array set to be true during initialization - put.add(Bytes.toBytes(Constants.EI), Bytes.toBytes(Constants.EICHANGED), Bytes - .toBytes(String.valueOf(1))); - - context.write(key, put); - } -} Index: src/java/org/apache/hama/matrix/algebra/PivotMap.java =================================================================== --- src/java/org/apache/hama/matrix/algebra/PivotMap.java (리비전 942971) +++ src/java/org/apache/hama/matrix/algebra/PivotMap.java (작업 사본) @@ -1,29 +0,0 @@ -package org.apache.hama.matrix.algebra; - -import java.io.IOException; - -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hama.io.Pair; - -public class PivotMap extends - Mapper { - private double max = 0; - private Pair pair = new Pair(0, 0); - private Pair dummyPair = new Pair(Integer.MAX_VALUE, Integer.MAX_VALUE); - private DoubleWritable dummyVal = new DoubleWritable(0.0); - - public void map(Pair key, DoubleWritable value, Context context) - throws IOException, InterruptedException { - if (key.getRow() != Integer.MAX_VALUE) { - if (Math.abs(value.get()) > Math.abs(max)) { - pair.set(key.getRow(), key.getColumn()); - max = value.get(); - } - } else { - context.write(pair, new DoubleWritable(max)); - context.write(dummyPair, dummyVal); - } - } - -} Index: src/test/org/apache/hama/Benchmarks.java =================================================================== --- src/test/org/apache/hama/Benchmarks.java (리비전 943351) +++ src/test/org/apache/hama/Benchmarks.java (작업 사본) @@ -17,7 +17,7 @@ .parseInt(args[0]), Integer.parseInt(args[0])); double start = System.currentTimeMillis(); - rand.jacobiEigenValue(Integer.parseInt(args[1])); + JacobiEigen.jacobiEigenValue(rand, Integer.parseInt(args[1])); double end = System.currentTimeMillis(); System.out.println("Runtime: " + ((end - start)) * 1000 + " sec"); } Index: src/test/org/apache/hama/examples/TestJacobiEigenValue.java =================================================================== --- src/test/org/apache/hama/examples/TestJacobiEigenValue.java (리비전 0) +++ src/test/org/apache/hama/examples/TestJacobiEigenValue.java (리비전 0) @@ -0,0 +1,182 @@ +package org.apache.hama.examples; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; + +import org.apache.hama.HamaCluster; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.examples.JacobiEigen; +import org.apache.hama.matrix.DenseMatrix; +import org.apache.hama.matrix.Matrix; +import org.apache.hama.matrix.TestDenseMatrix; +import org.apache.log4j.Logger; + +public class TestJacobiEigenValue extends HamaCluster { + static final Logger LOG = Logger.getLogger(TestDenseMatrix.class); + private int SIZE = 6; + private Matrix m5; + private HamaConfiguration conf; + /** + * The correct EigenValues are {11.099019513612875, 0.9009804864500339, + * 0.585786437634226 , 3.4142135624028565, 1.0, 1.0 } + */ + private double[][] A = new double[][] { { 4, 3, 2, 1, 0, 0 }, + { 3, 4, 3, 2, 0, 0 }, { 2, 3, 4, 3, 0, 0 }, { 1, 2, 3, 4, 0, 0 }, + { 0, 0, 0, 0, 1, 0 }, { 0, 0, 0, 0, 0, 1 } }; + + /** + * @throws UnsupportedEncodingException + */ + public TestJacobiEigenValue() throws UnsupportedEncodingException { + super(); + } + + public void setUp() throws Exception { + super.setUp(); + + conf = getConf(); + m5 = new DenseMatrix(conf, SIZE, SIZE); + + for (int i = 0; i < SIZE; i++) { + for (int j = 0; j < SIZE; j++) { + m5.set(i, j, A[i][j]); + } + } + } + + public void testJacobiEigenValue() throws IOException { + // copy Matrix m5 to the array + double[][] S = A; + + for (int i = 0; i < SIZE; i++) { + for (int j = 0; j < SIZE; j++) { + S[i][j] = m5.get(i, j); + } + } + + // do jacobi egien value over S array + int i, j, k, l, m, state; + double s, c, t, p, y; + double e1, e2; + // index array + int[] ind = new int[SIZE]; + boolean[] changed = new boolean[SIZE]; + + // output + double[] e = new double[SIZE]; + double[][] E = new double[SIZE][SIZE]; + + // init e & E; ind & changed + for (i = 0; i < SIZE; i++) { + for (j = 0; j < SIZE; j++) { + E[i][j] = 0; + } + E[i][i] = 1; + } + + state = SIZE; + + for (i = 0; i < SIZE; i++) { + ind[i] = maxind(S, i, SIZE); + e[i] = S[i][i]; + changed[i] = true; + } + + int loops = 100; + int icount = 0; + // next rotation + while (state != 0 && icount < loops) { + icount = icount + 1; + // find index(k, l) for pivot p + m = 0; + + for (k = 1; k <= SIZE - 2; k++) { + if (Math.abs(S[m][ind[m]]) < Math.abs(S[k][ind[k]])) { + m = k; + } + } + + k = m; + l = ind[m]; + p = S[k][l]; + + // calculate c = cos, s = sin + y = (e[l] - e[k]) / 2; + t = Math.abs(y) + Math.sqrt(p * p + y * y); + s = Math.sqrt(p * p + t * t); + c = t / s; + s = p / s; + t = (p * p) / t; + if (y < 0) { + s = -s; + t = -t; + } + + S[k][l] = 0.0; + state = update(e, changed, k, -t, state); + state = update(e, changed, l, t, state); + + for (i = 0; i <= k - 1; i++) + rotate(S, i, k, i, l, c, s); + for (i = l + 1; i < SIZE; i++) + rotate(S, k, i, l, i, c, s); + for (i = k + 1; i <= l - 1; i++) + rotate(S, k, i, i, l, c, s); + + // rotate eigenvectors + for (i = 0; i < SIZE; i++) { + e1 = E[k][i]; + e2 = E[l][i]; + + E[k][i] = c * e1 - s * e2; + E[l][i] = s * e1 + c * e2; + } + + ind[k] = maxind(S, k, SIZE); + ind[l] = maxind(S, l, SIZE); + + loops--; + } + + for (int x = 0; x < SIZE; x++) { + System.out.println(e[x]); + } + + // do m/r jacobi eigen value computation + DenseMatrix dm = (DenseMatrix) m5; + JacobiEigen.jacobiEigenValue(dm, 100); + + // verify the results + assertTrue(JacobiEigen.verifyEigenValue(e, E)); + } + + // index of largest off-diagonal element in row k + int maxind(double[][] S, int row, int size) { + int m = row + 1; + for (int i = row + 2; i < size; i++) { + if (Math.abs(S[row][i]) > Math.abs(S[row][m])) + m = i; + } + return m; + } + + int update(double[] e, boolean[] changed, int row, double value, int state) { + double y = e[row]; + e[row] += value; + + if (changed[row] && y == e[row]) { + changed[row] = false; + return state - 1; + } else if (!changed[row] && y != e[row]) { + changed[row] = true; + return state + 1; + } else + return state; + } + + void rotate(double[][] S, int k, int l, int i, int j, double c, double s) { + double s1 = S[k][l], s2 = S[i][j]; + S[k][l] = c * s1 - s * s2; + S[i][j] = s * s1 + c * s2; + } +} Index: src/test/org/apache/hama/mapreduce/TestRandomMatrixMapReduce.java =================================================================== --- src/test/org/apache/hama/mapreduce/TestRandomMatrixMapReduce.java (리비전 943351) +++ src/test/org/apache/hama/mapreduce/TestRandomMatrixMapReduce.java (작업 사본) @@ -22,11 +22,10 @@ import java.io.IOException; import org.apache.hama.HamaCluster; +import org.apache.hama.examples.RandomMatrix; import org.apache.hama.matrix.DenseMatrix; import org.apache.hama.matrix.SparseMatrix; import org.apache.log4j.Logger; -import org.apache.hama.examples.mapreduce.*; -import org.apache.hama.examples.*; public class TestRandomMatrixMapReduce extends HamaCluster { static final Logger LOG = Logger.getLogger(TestRandomMatrixMapReduce.class); @@ -35,7 +34,7 @@ DenseMatrix rand = RandomMatrix.random_mapred(conf, 20, 20); assertEquals(20, rand.getRows()); assertEquals(20, rand.getColumns()); - + for(int i = 0; i < 20; i++) { for(int j = 0; j < 20; j++) { assertTrue(rand.get(i, j) > 0); Index: src/test/org/apache/hama/matrix/TestJacobiEigenValue.java =================================================================== --- src/test/org/apache/hama/matrix/TestJacobiEigenValue.java (리비전 943351) +++ src/test/org/apache/hama/matrix/TestJacobiEigenValue.java (작업 사본) @@ -1,178 +0,0 @@ -package org.apache.hama.matrix; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; - -import org.apache.hama.HamaCluster; -import org.apache.hama.HamaConfiguration; -import org.apache.log4j.Logger; - -public class TestJacobiEigenValue extends HamaCluster { - static final Logger LOG = Logger.getLogger(TestDenseMatrix.class); - private int SIZE = 6; - private Matrix m5; - private HamaConfiguration conf; - /** - * The correct EigenValues are {11.099019513612875, 0.9009804864500339, - * 0.585786437634226 , 3.4142135624028565, 1.0, 1.0 } - */ - private double[][] A = new double[][] { { 4, 3, 2, 1, 0, 0 }, - { 3, 4, 3, 2, 0, 0 }, { 2, 3, 4, 3, 0, 0 }, { 1, 2, 3, 4, 0, 0 }, - { 0, 0, 0, 0, 1, 0 }, { 0, 0, 0, 0, 0, 1 } }; - - /** - * @throws UnsupportedEncodingException - */ - public TestJacobiEigenValue() throws UnsupportedEncodingException { - super(); - } - - public void setUp() throws Exception { - super.setUp(); - - conf = getConf(); - m5 = new DenseMatrix(conf, SIZE, SIZE); - - for (int i = 0; i < SIZE; i++) { - for (int j = 0; j < SIZE; j++) { - m5.set(i, j, A[i][j]); - } - } - } - - public void testJacobiEigenValue() throws IOException { - // copy Matrix m5 to the array - double[][] S = A; - - for (int i = 0; i < SIZE; i++) { - for (int j = 0; j < SIZE; j++) { - S[i][j] = m5.get(i, j); - } - } - - // do jacobi egien value over S array - int i, j, k, l, m, state; - double s, c, t, p, y; - double e1, e2; - // index array - int[] ind = new int[SIZE]; - boolean[] changed = new boolean[SIZE]; - - // output - double[] e = new double[SIZE]; - double[][] E = new double[SIZE][SIZE]; - - // init e & E; ind & changed - for (i = 0; i < SIZE; i++) { - for (j = 0; j < SIZE; j++) { - E[i][j] = 0; - } - E[i][i] = 1; - } - - state = SIZE; - - for (i = 0; i < SIZE; i++) { - ind[i] = maxind(S, i, SIZE); - e[i] = S[i][i]; - changed[i] = true; - } - - int loops = 100; - int icount = 0; - // next rotation - while (state != 0 && icount < loops) { - icount = icount + 1; - // find index(k, l) for pivot p - m = 0; - - for (k = 1; k <= SIZE - 2; k++) { - if (Math.abs(S[m][ind[m]]) < Math.abs(S[k][ind[k]])) { - m = k; - } - } - - k = m; - l = ind[m]; - p = S[k][l]; - - // calculate c = cos, s = sin - y = (e[l] - e[k]) / 2; - t = Math.abs(y) + Math.sqrt(p * p + y * y); - s = Math.sqrt(p * p + t * t); - c = t / s; - s = p / s; - t = (p * p) / t; - if (y < 0) { - s = -s; - t = -t; - } - - S[k][l] = 0.0; - state = update(e, changed, k, -t, state); - state = update(e, changed, l, t, state); - - for (i = 0; i <= k - 1; i++) - rotate(S, i, k, i, l, c, s); - for (i = l + 1; i < SIZE; i++) - rotate(S, k, i, l, i, c, s); - for (i = k + 1; i <= l - 1; i++) - rotate(S, k, i, i, l, c, s); - - // rotate eigenvectors - for (i = 0; i < SIZE; i++) { - e1 = E[k][i]; - e2 = E[l][i]; - - E[k][i] = c * e1 - s * e2; - E[l][i] = s * e1 + c * e2; - } - - ind[k] = maxind(S, k, SIZE); - ind[l] = maxind(S, l, SIZE); - - loops--; - } - - for (int x = 0; x < SIZE; x++) { - System.out.println(e[x]); - } - - // do m/r jacobi eigen value computation - DenseMatrix dm = (DenseMatrix) m5; - dm.jacobiEigenValue(100); - - // verify the results - assertTrue(dm.verifyEigenValue(e, E)); - } - - // index of largest off-diagonal element in row k - int maxind(double[][] S, int row, int size) { - int m = row + 1; - for (int i = row + 2; i < size; i++) { - if (Math.abs(S[row][i]) > Math.abs(S[row][m])) - m = i; - } - return m; - } - - int update(double[] e, boolean[] changed, int row, double value, int state) { - double y = e[row]; - e[row] += value; - - if (changed[row] && y == e[row]) { - changed[row] = false; - return state - 1; - } else if (!changed[row] && y != e[row]) { - changed[row] = true; - return state + 1; - } else - return state; - } - - void rotate(double[][] S, int k, int l, int i, int j, double c, double s) { - double s1 = S[k][l], s2 = S[i][j]; - S[k][l] = c * s1 - s * s2; - S[i][j] = s * s1 + c * s2; - } -} Index: src/test/org/apache/hama/matrix/TestSingularValueDecomposition.java =================================================================== --- src/test/org/apache/hama/matrix/TestSingularValueDecomposition.java (리비전 943351) +++ src/test/org/apache/hama/matrix/TestSingularValueDecomposition.java (작업 사본) @@ -29,6 +29,7 @@ import org.apache.hama.Constants; import org.apache.hama.HamaCluster; import org.apache.hama.HamaConfiguration; +import org.apache.hama.examples.JacobiEigen; import org.apache.hama.examples.MatrixMultiplication; import org.apache.hama.util.BytesUtil; import org.apache.log4j.Logger; @@ -76,7 +77,7 @@ } // Find the eigen/singular values and vectors of A'A - aTa.jacobiEigenValue(100); + JacobiEigen.jacobiEigenValue(aTa, 100); HTable table = aTa.getHTable(); for (int x = 0; x < 2; x++) {