Index: src/java/org/apache/hama/AbstractMatrix.java =================================================================== --- src/java/org/apache/hama/AbstractMatrix.java (revision 728567) +++ src/java/org/apache/hama/AbstractMatrix.java (working copy) @@ -97,6 +97,10 @@ return this.table; } + public void commit(VectorUpdate update) throws IOException { + this.table.commit(update.getBatchUpdate()); + } + /** {@inheritDoc} */ public int getRows() throws IOException { Cell rows = null; Index: src/java/org/apache/hama/AbstractVector.java =================================================================== --- src/java/org/apache/hama/AbstractVector.java (revision 728567) +++ src/java/org/apache/hama/AbstractVector.java (working copy) @@ -22,7 +22,7 @@ import java.util.Iterator; import org.apache.hama.io.DoubleEntry; -import org.apache.hama.io.MapWritable; +import org.apache.hama.io.HamaMapWritable; /** * Methods of the vector classes @@ -28,7 +28,7 @@ * Methods of the vector classes */ public abstract class AbstractVector { - public MapWritable entries; + public HamaMapWritable entries; /** * Gets the value of index @@ -56,7 +56,7 @@ public void set(int index, double value) { // If entries are null, create new object if(this.entries == null) { - this.entries = new MapWritable(); + this.entries = new HamaMapWritable(); } this.entries.put(index, new DoubleEntry(value)); @@ -91,11 +91,11 @@ } /** - * Returns the {@link org.apache.hama.io.MapWritable} + * Returns the {@link org.apache.hama.io.HamaMapWritable} * * @return the entries of vector */ - public MapWritable getEntries() { + public HamaMapWritable getEntries() { return this.entries; } } Index: src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java =================================================================== --- src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java (revision 728567) +++ src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java (working copy) @@ -21,6 +21,7 @@ import java.io.IOException; +import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; @@ -31,7 +32,6 @@ import org.apache.hama.DenseMatrix; import org.apache.hama.HamaConfiguration; import org.apache.hama.SubMatrix; -import org.apache.hama.io.BlockID; import org.apache.hama.io.BlockWritable; import org.apache.hama.mapred.BlockInputFormat; import org.apache.log4j.Logger; @@ -37,7 +37,7 @@ import org.apache.log4j.Logger; public class BlockCyclicMultiplyMap extends MapReduceBase implements - Mapper { + Mapper { static final Logger LOG = Logger.getLogger(BlockCyclicMultiplyMap.class); protected DenseMatrix matrix_b; public static final String MATRIX_B = "hama.multiplication.matrix.b"; @@ -51,7 +51,7 @@ } public static void initJob(String matrix_a, String matrix_b, - Class map, Class outputKeyClass, + Class map, Class outputKeyClass, Class outputValueClass, JobConf jobConf) { jobConf.setMapOutputValueClass(outputValueClass); @@ -66,16 +66,38 @@ } @Override - public void map(BlockID key, BlockWritable value, - OutputCollector output, Reporter reporter) + public void map(IntWritable key, BlockWritable value, + OutputCollector output, Reporter reporter) throws IOException { - int blockSize = matrix_b.getBlockSize(); + for (int i = 0; i < value.size(); i++) { + SubMatrix a = value.get(i); + for (int j = 0; j < matrix_b.getBlockSize(); j++) { + SubMatrix b = matrix_b.getBlock(i, j); + SubMatrix c = a.mult(b); + output.collect(key, new BlockWritable(key.get(), j, c)); + } + } + + /* SubMatrix a = value.get(); - for (int j = 0; j < blockSize; j++) { - SubMatrix b = matrix_b.getBlock(key.getColumn(), j); + HTable table = matrix_b.getHTable(); + + // startKey : new BlockID(key.getColumn(), 0).toString() + // endKey : new BlockID(key.getColumn(), blockSize+1).toString() + Scanner scan = table.getScanner( + new byte[][] {Bytes.toBytes(Constants.BLOCK)}, + new BlockID(key.getColumn(), 0).getBytes(), + new BlockID(key.getColumn(), blockSize+1).getBytes()); + + for(RowResult row : scan) { + BlockID bid = new BlockID(row.getRow()); + SubMatrix b = new SubMatrix(row.get(Constants.BLOCK).getValue()); SubMatrix c = a.mult(b); - output.collect(new BlockID(key.getRow(), j), new BlockWritable(c)); + output.collect(new BlockID(key.getRow(), + bid.getColumn()), new BlockWritable(c)); } + scan.close(); + */ } } Index: src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java =================================================================== --- src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java (revision 728567) +++ src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java (working copy) @@ -20,7 +20,9 @@ package org.apache.hama.algebra; import java.io.IOException; +import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.io.IntWritable; @@ -30,7 +32,7 @@ import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hama.SubMatrix; -import org.apache.hama.io.BlockID; +import org.apache.hama.io.BlockEntry; import org.apache.hama.io.BlockWritable; import org.apache.hama.io.VectorUpdate; import org.apache.hama.mapred.VectorOutputFormat; @@ -37,7 +39,7 @@ import org.apache.log4j.Logger; public class BlockCyclicMultiplyReduce extends MapReduceBase implements - Reducer { + Reducer { static final Logger LOG = Logger.getLogger(BlockCyclicMultiplyReduce.class); /** @@ -58,29 +60,39 @@ } @Override - public void reduce(BlockID key, Iterator values, + public void reduce(IntWritable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { + int row = key.get(); + Map sum = new HashMap(); - SubMatrix s = null; while (values.hasNext()) { - SubMatrix b = values.next().get(); - if (s == null) { - s = b; - } else { - s = s.add(b); + BlockWritable b = values.next(); + for (Map.Entry e : b.entrySet()) { + int j = e.getKey(); + SubMatrix value = e.getValue().getValue(); + if (sum.containsKey(j)) { + sum.put(j, sum.get(j).add(value)); + } else { + sum.put(j, value); + } } } - int startRow = key.getRow() * s.getRows(); - int startColumn = key.getColumn() * s.getColumns(); + for (Map.Entry e : sum.entrySet()) { + int column = e.getKey(); + SubMatrix mat = e.getValue(); - for (int i = 0; i < s.getRows(); i++) { - VectorUpdate update = new VectorUpdate(i + startRow); - for (int j = 0; j < s.getColumns(); j++) { - update.put(j + startColumn, s.get(i, j)); + int startRow = row * mat.getRows(); + int startColumn = column * mat.getColumns(); + + for (int i = 0; i < mat.getRows(); i++) { + VectorUpdate update = new VectorUpdate(i + startRow); + for (int j = 0; j < mat.getColumns(); j++) { + update.put(j + startColumn, mat.get(i, j)); + } + output.collect(key, update); } - output.collect(new IntWritable(key.getRow()), update); } } } Index: src/java/org/apache/hama/DenseMatrix.java =================================================================== --- src/java/org/apache/hama/DenseMatrix.java (revision 728567) +++ src/java/org/apache/hama/DenseMatrix.java (working copy) @@ -52,7 +52,7 @@ import org.apache.hama.io.BlockPosition; import org.apache.hama.io.BlockWritable; import org.apache.hama.io.DoubleEntry; -import org.apache.hama.io.MapWritable; +import org.apache.hama.io.HamaMapWritable; import org.apache.hama.io.VectorUpdate; import org.apache.hama.io.VectorWritable; import org.apache.hama.mapred.BlockingMapRed; @@ -64,9 +64,8 @@ public class DenseMatrix extends AbstractMatrix implements Matrix { static int tryPathLength = Constants.DEFAULT_PATH_LENGTH; static final String TABLE_PREFIX = DenseMatrix.class.getSimpleName() + "_"; - static private final Path TMP_DIR = new Path(DenseMatrix.class - .getSimpleName() - + "_TMP_dir"); + static public final Path TMP_DIR = new Path(DenseMatrix.class + .getSimpleName() + "_TMP_dir"); /** * Construct a raw matrix. Just create a table in HBase, but didn't lay any @@ -375,7 +374,7 @@ byte[][] c = { columnKey }; Scanner scan = table.getScanner(c, HConstants.EMPTY_START_ROW); - MapWritable trunk = new MapWritable(); + HamaMapWritable trunk = new HamaMapWritable(); for (RowResult row : scan) { trunk.put(BytesUtil.bytesToInt(row.getRow()), new DoubleEntry(row @@ -393,12 +392,12 @@ jobConf.setNumMapTasks(config.getNumMapTasks()); jobConf.setNumReduceTasks(config.getNumReduceTasks()); - + if (this.isBlocked() && ((DenseMatrix) B).isBlocked()) { BlockCyclicMultiplyMap.initJob(this.getPath(), B.getPath(), - BlockCyclicMultiplyMap.class, BlockID.class, BlockWritable.class, + BlockCyclicMultiplyMap.class, IntWritable.class, BlockWritable.class, jobConf); - BlockCyclicMultiplyReduce.initJob(result.getPath(), + BlockCyclicMultiplyReduce.initJob(result.getPath(), BlockCyclicMultiplyReduce.class, jobConf); } else { SIMDMultiplyMap.initJob(this.getPath(), B.getPath(), @@ -404,10 +403,8 @@ SIMDMultiplyMap.initJob(this.getPath(), B.getPath(), SIMDMultiplyMap.class, IntWritable.class, VectorWritable.class, jobConf); - SIMDMultiplyReduce.initJob(result.getPath(), SIMDMultiplyReduce.class, - jobConf); + SIMDMultiplyReduce.initJob(result.getPath(), SIMDMultiplyReduce.class, jobConf); } - JobManager.execute(jobConf, result); return result; } @@ -482,8 +479,8 @@ } public SubMatrix getBlock(int i, int j) throws IOException { - return new SubMatrix(table.get(new BlockID(i, j).getBytes(), - Bytes.toBytes(Constants.BLOCK)).getValue()); + return new SubMatrix(table.get(String.valueOf(i), Constants.BLOCK + j) + .getValue()); } /** @@ -502,8 +499,8 @@ } public void setBlock(int i, int j, SubMatrix matrix) throws IOException { - BatchUpdate update = new BatchUpdate(new BlockID(i, j).getBytes()); - update.put(Bytes.toBytes(Constants.BLOCK), matrix.getBytes()); + BatchUpdate update = new BatchUpdate(String.valueOf(i)); + update.put(Bytes.toBytes(Constants.BLOCK + j), matrix.getBytes()); table.commit(update); } @@ -526,7 +523,7 @@ if (endColumn >= this.getColumns()) endColumn = this.getColumns() - 1; - BatchUpdate update = new BatchUpdate(new BlockID(i, j).getBytes()); + BatchUpdate update = new BatchUpdate(new BlockID(i, j).toString()); update.put(Constants.BLOCK_POSITION, new BlockPosition(startRow, endRow, startColumn, endColumn).getBytes()); table.commit(update); @@ -539,8 +536,8 @@ } protected BlockPosition getBlockPosition(int i, int j) throws IOException { - byte[] rs = table.get(new BlockID(i, j).getBytes(), - Bytes.toBytes(Constants.BLOCK_POSITION)).getValue(); + byte[] rs = table.get(new BlockID(i, j).toString(), + Constants.BLOCK_POSITION).getValue(); return new BlockPosition(rs); } Index: src/java/org/apache/hama/DenseVector.java =================================================================== --- src/java/org/apache/hama/DenseVector.java (revision 728567) +++ src/java/org/apache/hama/DenseVector.java (working copy) @@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.io.Cell; import org.apache.hadoop.hbase.io.RowResult; import org.apache.hama.io.DoubleEntry; -import org.apache.hama.io.MapWritable; +import org.apache.hama.io.HamaMapWritable; import org.apache.hama.util.BytesUtil; import org.apache.log4j.Logger; @@ -34,10 +34,10 @@ static final Logger LOG = Logger.getLogger(DenseVector.class); public DenseVector() { - this(new MapWritable()); + this(new HamaMapWritable()); } - public DenseVector(MapWritable m) { + public DenseVector(HamaMapWritable m) { this.entries = m; } @@ -42,7 +42,7 @@ } public DenseVector(RowResult row) { - this.entries = new MapWritable(); + this.entries = new HamaMapWritable(); for (Map.Entry f : row.entrySet()) { this.entries.put(BytesUtil.getColumnIndex(f.getKey()), new DoubleEntry(f.getValue())); Index: src/java/org/apache/hama/io/BlockID.java =================================================================== --- src/java/org/apache/hama/io/BlockID.java (revision 728886) +++ src/java/org/apache/hama/io/BlockID.java (working copy) @@ -19,14 +19,11 @@ */ package org.apache.hama.io; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableComparable; /** A WritableComparable for BlockIDs. */ @@ -31,8 +28,8 @@ /** A WritableComparable for BlockIDs. */ @SuppressWarnings("unchecked") -public class BlockID implements WritableComparable, java.io.Serializable { - private static final long serialVersionUID = 6434651179475226613L; +public class BlockID implements WritableComparable { + public static final int PAD_SIZE = 30; private int row; private int column; @@ -44,18 +41,24 @@ } public BlockID(byte[] bytes) throws IOException { - ByteArrayInputStream bos = new ByteArrayInputStream(bytes); - ObjectInputStream oos = new ObjectInputStream(bos); - Object obj = null; - try { - obj = oos.readObject(); - this.row = ((BlockID)obj).getRow(); - this.column = ((BlockID)obj).getColumn(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); + String rKey = Bytes.toString(bytes); + String keys[] = null; + if (rKey.substring(0, 8).equals("0000000000000000")) { + int i = 8; + while (rKey.charAt(i) == '0') { + i++; + } + keys = rKey.substring(i, rKey.length()).split("[,]"); + } else { + int i = 0; + while (rKey.charAt(i) == '0') { + i++; + } + keys = rKey.substring(i, rKey.length()).split("[,]"); } - oos.close(); - bos.close(); + + this.row = Integer.parseInt(keys[1]); + this.column = Integer.parseInt(keys[2]); } public void set(int row, int column) { @@ -72,13 +75,13 @@ } public void readFields(DataInput in) throws IOException { - column = in.readInt(); - row = in.readInt(); + BlockID value = new BlockID(Bytes.readByteArray(in)); + this.row = value.getRow(); + this.column = value.getColumn(); } public void write(DataOutput out) throws IOException { - out.writeInt(column); - out.writeInt(row); + Bytes.writeByteArray(out, Bytes.toBytes(this.toString())); } /** @@ -85,7 +88,14 @@ * Make BlockID's string representation be same format. */ public String toString() { - return row + "," + column; + int zeros = PAD_SIZE - String.valueOf(row).length() + - String.valueOf(column).length(); + StringBuffer buf = new StringBuffer(); + for (int i = 0; i < zeros; ++i) { + buf.append("0"); + } + + return buf.toString() + "," + row + "," + column; } @Override @@ -110,19 +120,14 @@ @Override public boolean equals(Object o) { - if(o == null) return false; - if(!(o instanceof BlockID)) return false; + if (o == null) + return false; + if (!(o instanceof BlockID)) + return false; return compareTo(o) == 0; } - public byte[] getBytes() throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(bos); - oos.writeObject(this); - oos.flush(); - oos.close(); - bos.close(); - byte[] data = bos.toByteArray(); - return data; + public byte[] getBytes() { + return Bytes.toBytes(this.toString()); } } Index: src/java/org/apache/hama/io/BlockMapWritable.java =================================================================== --- src/java/org/apache/hama/io/BlockMapWritable.java (revision 0) +++ src/java/org/apache/hama/io/BlockMapWritable.java (revision 0) @@ -0,0 +1,170 @@ +package org.apache.hama.io; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hama.util.BytesUtil; + +public class BlockMapWritable implements Map, Writable, + Configurable { + private AtomicReference conf = new AtomicReference(); + + // Static maps of code to class and vice versa. Includes types used in hama + // only. + static final Map> CODE_TO_CLASS = new HashMap>(); + static final Map, Byte> CLASS_TO_CODE = new HashMap, Byte>(); + + static { + byte code = 0; + addToMap(HStoreKey.class, code++); + addToMap(ImmutableBytesWritable.class, code++); + addToMap(Text.class, code++); + addToMap(BlockEntry.class, code++); + addToMap(byte[].class, code++); + } + + @SuppressWarnings("boxing") + private static void addToMap(final Class clazz, final byte code) { + CLASS_TO_CODE.put(clazz, code); + CODE_TO_CLASS.put(code, clazz); + } + + private Map instance = new TreeMap(); + + /** @return the conf */ + public Configuration getConf() { + return conf.get(); + } + + /** @param conf the conf to set */ + public void setConf(Configuration conf) { + this.conf.set(conf); + } + + /** {@inheritDoc} */ + public void clear() { + instance.clear(); + } + + /** {@inheritDoc} */ + public boolean containsKey(Object key) { + return instance.containsKey(key); + } + + /** {@inheritDoc} */ + public boolean containsValue(Object value) { + return instance.containsValue(value); + } + + /** {@inheritDoc} */ + public Set> entrySet() { + return instance.entrySet(); + } + + /** {@inheritDoc} */ + public V get(Object key) { + return instance.get(key); + } + + /** {@inheritDoc} */ + public boolean isEmpty() { + return instance.isEmpty(); + } + + /** {@inheritDoc} */ + public Set keySet() { + return instance.keySet(); + } + + /** {@inheritDoc} */ + public int size() { + return instance.size(); + } + + /** {@inheritDoc} */ + public Collection values() { + return instance.values(); + } + + // Writable + + /** @return the Class class for the specified id */ + protected Class getClass(byte id) { + return CODE_TO_CLASS.get(id); + } + + /** @return the id for the specified Class */ + protected byte getId(Class clazz) { + Byte b = CLASS_TO_CODE.get(clazz); + if (b == null) { + throw new NullPointerException("Nothing for : " + clazz); + } + return b; + } + + @Override + public String toString() { + return this.instance.toString(); + } + + /** {@inheritDoc} */ + public void write(DataOutput out) throws IOException { + // Write out the number of entries in the map + out.writeInt(this.instance.size()); + + // Then write out each key/value pair + for (Map.Entry e : instance.entrySet()) { + Bytes.writeByteArray(out, BytesUtil.getBlockIndex(e.getKey())); + out.writeByte(getId(e.getValue().getClass())); + ((Writable) e.getValue()).write(out); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") +public void readFields(DataInput in) throws IOException { + // First clear the map. Otherwise we will just accumulate + // entries every time this method is called. + this.instance.clear(); + + // Read the number of entries in the map + int entries = in.readInt(); + + // Then read each key/value pair + for (int i = 0; i < entries; i++) { + byte[] key = Bytes.readByteArray(in); + Writable value = (Writable) ReflectionUtils.newInstance(getClass(in + .readByte()), getConf()); + value.readFields(in); + V v = (V) value; + this.instance.put(BytesUtil.getBlockIndex(key), v); + } + } + + public void putAll(Map m) { + this.instance.putAll(m); + } + + public V remove(Object key) { + return this.instance.remove(key); + } + + public V put(Integer key, V value) { + return this.instance.put(key, value); + } +} Index: src/java/org/apache/hama/io/BlockWritable.java =================================================================== --- src/java/org/apache/hama/io/BlockWritable.java (revision 728886) +++ src/java/org/apache/hama/io/BlockWritable.java (working copy) @@ -22,6 +22,12 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; @@ -26,31 +32,124 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; import org.apache.hama.SubMatrix; +import org.apache.hama.util.BytesUtil; + +public class BlockWritable implements Writable, Map { -public class BlockWritable implements Writable { - public SubMatrix matrix; + public Integer row; + public BlockMapWritable entries; public BlockWritable() { - this.matrix = new SubMatrix(0, 0); + this(new BlockMapWritable()); + } + + public BlockWritable(BlockMapWritable entries) { + this.entries = entries; + } + + public BlockWritable(int i, int j, SubMatrix mult) throws IOException { + this.row = i; + BlockMapWritable tr = new BlockMapWritable(); + tr.put(j, new BlockEntry(mult)); + this.entries = tr; + } + + public int size() { + return this.entries.size(); + } + + public SubMatrix get(int key) throws IOException { + return this.entries.get(key).getValue(); + } + + public BlockEntry put(Integer key, BlockEntry value) { + throw new UnsupportedOperationException("VectorWritable is read-only!"); + } + + public BlockEntry get(Object key) { + return this.entries.get(key); + } + + public BlockEntry remove(Object key) { + throw new UnsupportedOperationException("VectorWritable is read-only!"); + } + + public boolean containsKey(Object key) { + return entries.containsKey(key); + } + + public boolean containsValue(Object value) { + throw new UnsupportedOperationException("Don't support containsValue!"); + } + + public boolean isEmpty() { + return entries.isEmpty(); + } + + public void clear() { + throw new UnsupportedOperationException("VectorDatum is read-only!"); + } + + public Set keySet() { + Set result = new TreeSet(); + for (Integer w : entries.keySet()) { + result.add(w); + } + return result; } - public BlockWritable(SubMatrix c) { - this.matrix = c; + public Set> entrySet() { + return Collections.unmodifiableSet(this.entries.entrySet()); } - public BlockWritable(byte[] bytes) throws IOException { - this.matrix = new SubMatrix(bytes); + public Collection values() { + ArrayList result = new ArrayList(); + for (Writable w : entries.values()) { + result.add((BlockEntry) w); + } + return result; } - public void readFields(DataInput in) throws IOException { - this.matrix = new SubMatrix(Bytes.readByteArray(in)); + public void readFields(final DataInput in) throws IOException { + this.row = BytesUtil.bytesToInt(Bytes.readByteArray(in)); + this.entries.readFields(in); } - public void write(DataOutput out) throws IOException { - Bytes.writeByteArray(out, this.matrix.getBytes()); + public void write(final DataOutput out) throws IOException { + Bytes.writeByteArray(out, BytesUtil.intToBytes(this.row)); + this.entries.write(out); } - public SubMatrix get() { - return this.matrix; + public void putAll(Map m) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + /** + * + * The inner class for an entry of row. + * + */ + public static class Entries implements Map.Entry { + + private final byte[] column; + private final BlockEntry entry; + + Entries(byte[] column, BlockEntry entry) { + this.column = column; + this.entry = entry; + } + + public BlockEntry setValue(BlockEntry c) { + throw new UnsupportedOperationException("VectorWritable is read-only!"); + } + + public byte[] getKey() { + byte[] key = column; + return key; + } + + public BlockEntry getValue() { + return entry; + } } } Index: src/java/org/apache/hama/io/HamaMapWritable.java =================================================================== --- src/java/org/apache/hama/io/HamaMapWritable.java (revision 0) +++ src/java/org/apache/hama/io/HamaMapWritable.java (revision 0) @@ -0,0 +1,190 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.io; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hama.util.BytesUtil; + +public class HamaMapWritable implements Map, Writable, + Configurable { + private AtomicReference conf = new AtomicReference(); + + // Static maps of code to class and vice versa. Includes types used in hama + // only. + static final Map> CODE_TO_CLASS = new HashMap>(); + static final Map, Byte> CLASS_TO_CODE = new HashMap, Byte>(); + + static { + byte code = 0; + addToMap(HStoreKey.class, code++); + addToMap(ImmutableBytesWritable.class, code++); + addToMap(Text.class, code++); + addToMap(DoubleEntry.class, code++); + addToMap(BlockEntry.class, code++); + addToMap(byte[].class, code++); + } + + @SuppressWarnings("boxing") + private static void addToMap(final Class clazz, final byte code) { + CLASS_TO_CODE.put(clazz, code); + CODE_TO_CLASS.put(code, clazz); + } + + private Map instance = new TreeMap(); + + /** @return the conf */ + public Configuration getConf() { + return conf.get(); + } + + /** @param conf the conf to set */ + public void setConf(Configuration conf) { + this.conf.set(conf); + } + + /** {@inheritDoc} */ + public void clear() { + instance.clear(); + } + + /** {@inheritDoc} */ + public boolean containsKey(Object key) { + return instance.containsKey(key); + } + + /** {@inheritDoc} */ + public boolean containsValue(Object value) { + return instance.containsValue(value); + } + + /** {@inheritDoc} */ + public Set> entrySet() { + return instance.entrySet(); + } + + /** {@inheritDoc} */ + public V get(Object key) { + return instance.get(key); + } + + /** {@inheritDoc} */ + public boolean isEmpty() { + return instance.isEmpty(); + } + + /** {@inheritDoc} */ + public Set keySet() { + return instance.keySet(); + } + + /** {@inheritDoc} */ + public int size() { + return instance.size(); + } + + /** {@inheritDoc} */ + public Collection values() { + return instance.values(); + } + + // Writable + + /** @return the Class class for the specified id */ + protected Class getClass(byte id) { + return CODE_TO_CLASS.get(id); + } + + /** @return the id for the specified Class */ + protected byte getId(Class clazz) { + Byte b = CLASS_TO_CODE.get(clazz); + if (b == null) { + throw new NullPointerException("Nothing for : " + clazz); + } + return b; + } + + @Override + public String toString() { + return this.instance.toString(); + } + + /** {@inheritDoc} */ + public void write(DataOutput out) throws IOException { + // Write out the number of entries in the map + out.writeInt(this.instance.size()); + + // Then write out each key/value pair + for (Map.Entry e : instance.entrySet()) { + Bytes.writeByteArray(out, BytesUtil.getColumnIndex(e.getKey())); + out.writeByte(getId(e.getValue().getClass())); + ((Writable) e.getValue()).write(out); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") +public void readFields(DataInput in) throws IOException { + // First clear the map. Otherwise we will just accumulate + // entries every time this method is called. + this.instance.clear(); + + // Read the number of entries in the map + int entries = in.readInt(); + + // Then read each key/value pair + for (int i = 0; i < entries; i++) { + byte[] key = Bytes.readByteArray(in); + Writable value = (Writable) ReflectionUtils.newInstance(getClass(in + .readByte()), getConf()); + value.readFields(in); + V v = (V) value; + this.instance.put(BytesUtil.getColumnIndex(key), v); + } + } + + public void putAll(Map m) { + this.instance.putAll(m); + } + + public V remove(Object key) { + return this.instance.remove(key); + } + + public V put(Integer key, V value) { + return this.instance.put(key, value); + } +} Index: src/java/org/apache/hama/io/MapWritable.java =================================================================== --- src/java/org/apache/hama/io/MapWritable.java (revision 728567) +++ src/java/org/apache/hama/io/MapWritable.java (working copy) @@ -1,190 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.io; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HStoreKey; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hama.util.BytesUtil; - -public class MapWritable implements Map, Writable, - Configurable { - private AtomicReference conf = new AtomicReference(); - - // Static maps of code to class and vice versa. Includes types used in hama - // only. - static final Map> CODE_TO_CLASS = new HashMap>(); - static final Map, Byte> CLASS_TO_CODE = new HashMap, Byte>(); - - static { - byte code = 0; - addToMap(HStoreKey.class, code++); - addToMap(ImmutableBytesWritable.class, code++); - addToMap(Text.class, code++); - addToMap(DoubleEntry.class, code++); - addToMap(BlockEntry.class, code++); - addToMap(byte[].class, code++); - } - - @SuppressWarnings("boxing") - private static void addToMap(final Class clazz, final byte code) { - CLASS_TO_CODE.put(clazz, code); - CODE_TO_CLASS.put(code, clazz); - } - - private Map instance = new TreeMap(); - - /** @return the conf */ - public Configuration getConf() { - return conf.get(); - } - - /** @param conf the conf to set */ - public void setConf(Configuration conf) { - this.conf.set(conf); - } - - /** {@inheritDoc} */ - public void clear() { - instance.clear(); - } - - /** {@inheritDoc} */ - public boolean containsKey(Object key) { - return instance.containsKey(key); - } - - /** {@inheritDoc} */ - public boolean containsValue(Object value) { - return instance.containsValue(value); - } - - /** {@inheritDoc} */ - public Set> entrySet() { - return instance.entrySet(); - } - - /** {@inheritDoc} */ - public V get(Object key) { - return instance.get(key); - } - - /** {@inheritDoc} */ - public boolean isEmpty() { - return instance.isEmpty(); - } - - /** {@inheritDoc} */ - public Set keySet() { - return instance.keySet(); - } - - /** {@inheritDoc} */ - public int size() { - return instance.size(); - } - - /** {@inheritDoc} */ - public Collection values() { - return instance.values(); - } - - // Writable - - /** @return the Class class for the specified id */ - protected Class getClass(byte id) { - return CODE_TO_CLASS.get(id); - } - - /** @return the id for the specified Class */ - protected byte getId(Class clazz) { - Byte b = CLASS_TO_CODE.get(clazz); - if (b == null) { - throw new NullPointerException("Nothing for : " + clazz); - } - return b; - } - - @Override - public String toString() { - return this.instance.toString(); - } - - /** {@inheritDoc} */ - public void write(DataOutput out) throws IOException { - // Write out the number of entries in the map - out.writeInt(this.instance.size()); - - // Then write out each key/value pair - for (Map.Entry e : instance.entrySet()) { - Bytes.writeByteArray(out, BytesUtil.getColumnIndex(e.getKey())); - out.writeByte(getId(e.getValue().getClass())); - ((Writable) e.getValue()).write(out); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") -public void readFields(DataInput in) throws IOException { - // First clear the map. Otherwise we will just accumulate - // entries every time this method is called. - this.instance.clear(); - - // Read the number of entries in the map - int entries = in.readInt(); - - // Then read each key/value pair - for (int i = 0; i < entries; i++) { - byte[] key = Bytes.readByteArray(in); - Writable value = (Writable) ReflectionUtils.newInstance(getClass(in - .readByte()), getConf()); - value.readFields(in); - V v = (V) value; - this.instance.put(BytesUtil.getColumnIndex(key), v); - } - } - - public void putAll(Map m) { - this.instance.putAll(m); - } - - public V remove(Object key) { - return this.instance.remove(key); - } - - public V put(Integer key, V value) { - return this.instance.put(key, value); - } -} Index: src/java/org/apache/hama/io/VectorWritable.java =================================================================== --- src/java/org/apache/hama/io/VectorWritable.java (revision 728886) +++ src/java/org/apache/hama/io/VectorWritable.java (working copy) @@ -38,13 +38,13 @@ public class VectorWritable implements Writable, Map { public Integer row; - public MapWritable entries; + public HamaMapWritable entries; public VectorWritable() { - this(new MapWritable()); + this(new HamaMapWritable()); } - public VectorWritable(MapWritable entries) { + public VectorWritable(HamaMapWritable entries) { this.entries = entries; } Index: src/java/org/apache/hama/mapred/BlockInputFormat.java =================================================================== --- src/java/org/apache/hama/mapred/BlockInputFormat.java (revision 728567) +++ src/java/org/apache/hama/mapred/BlockInputFormat.java (working copy) @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.mapred.TableSplit; import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -32,12 +33,11 @@ import org.apache.hadoop.mapred.JobConfigurable; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.hama.Constants; -import org.apache.hama.io.BlockID; import org.apache.hama.io.BlockWritable; +import org.apache.hama.util.BytesUtil; public class BlockInputFormat extends TableInputFormatBase implements - InputFormat, JobConfigurable { + InputFormat, JobConfigurable { static final Log LOG = LogFactory.getLog(BlockInputFormat.class); private TableRecordReader tableRecordReader; @@ -45,7 +45,7 @@ * Iterate over an HBase table data, return (BlockID, BlockWritable) pairs */ protected static class TableRecordReader extends TableRecordReaderBase - implements RecordReader { + implements RecordReader { /** * @return IntWritable @@ -52,8 +52,8 @@ * * @see org.apache.hadoop.mapred.RecordReader#createKey() */ - public BlockID createKey() { - return new BlockID(); + public IntWritable createKey() { + return new IntWritable(); } /** @@ -66,10 +66,10 @@ } /** - * @param key BlockID as input key. + * @param key IntWritable as input key. * @param value BlockWritable as input value * - * Converts Scanner.next() to BlockID, BlockWritable + * Converts Scanner.next() to IntWritable, BlockWritable * * @return true if there was more data * @throws IOException @@ -74,7 +74,7 @@ * @return true if there was more data * @throws IOException */ - public boolean next(BlockID key, BlockWritable value) + public boolean next(IntWritable key, BlockWritable value) throws IOException { RowResult result = this.scanner.next(); boolean hasMore = result != null && result.size() > 0; @@ -79,11 +79,8 @@ RowResult result = this.scanner.next(); boolean hasMore = result != null && result.size() > 0; if (hasMore) { - byte[] row = result.getRow(); - BlockID bID = new BlockID(row); - key.set(bID.getRow(), bID.getColumn()); - byte[] rs = result.get(Constants.BLOCK).getValue(); - Writables.copyWritable(new BlockWritable(rs), value); + key.set(BytesUtil.bytesToInt(result.getRow())); + Writables.copyWritable(result, value); } return hasMore; } @@ -96,7 +93,7 @@ * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit, * JobConf, Reporter) */ - public RecordReader getRecordReader( + public RecordReader getRecordReader( InputSplit split, JobConf job, Reporter reporter) throws IOException { TableSplit tSplit = (TableSplit) split; TableRecordReader trr = this.tableRecordReader; Index: src/java/org/apache/hama/SubMatrix.java =================================================================== --- src/java/org/apache/hama/SubMatrix.java (revision 728567) +++ src/java/org/apache/hama/SubMatrix.java (working copy) @@ -21,9 +21,9 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import org.apache.hama.util.BytesUtil; import org.apache.log4j.Logger; @@ -59,18 +59,19 @@ public SubMatrix(byte[] matrix) throws IOException { ByteArrayInputStream bos = new ByteArrayInputStream(matrix); - ObjectInputStream oos = new ObjectInputStream(bos); - Object obj = null; - try { - obj = oos.readObject(); - this.matrix = ((SubMatrix)obj).getDoubleArray(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); + DataInputStream dis = new DataInputStream(bos); + int rows = dis.readInt(); + int columns = dis.readInt(); + this.matrix = new double[rows][columns]; + for (int i = 0; i < rows; i++) { + for (int j = 0; j < columns; j++) { + this.set(i, j, dis.readDouble()); + } } - oos.close(); + dis.close(); bos.close(); } - + /** * Sets the value * @@ -90,9 +91,9 @@ * @param value */ public void set(int row, int column, byte[] value) { - matrix[row][column] = BytesUtil.bytesToDouble(value); + matrix[row][column] = BytesUtil.bytesToDouble(value); } - + /** * Gets the value * @@ -116,7 +117,7 @@ */ public SubMatrix add(SubMatrix b) { SubMatrix c = new SubMatrix(this.getRows(), this.getColumns()); - + for (int i = 0; i < this.getRows(); i++) { for (int j = 0; j < this.getColumns(); j++) { c.set(i, j, (this.get(i, j) + b.get(i, j))); @@ -134,7 +135,7 @@ */ public SubMatrix mult(SubMatrix b) { SubMatrix c = new SubMatrix(this.getRows(), b.getColumns()); - + for (int i = 0; i < this.getRows(); i++) { for (int j = 0; j < b.getColumns(); j++) { for (int k = 0; k < this.getColumns(); k++) { @@ -187,10 +188,16 @@ */ public byte[] getBytes() throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(bos); - oos.writeObject(this); - oos.flush(); - oos.close(); + DataOutputStream dos = new DataOutputStream(bos); + dos.writeInt(this.getRows()); + dos.writeInt(this.getColumns()); + for (int i = 0; i < this.getRows(); i++) { + for (int j = 0; j < this.getColumns(); j++) { + dos.writeDouble(this.get(i, j)); + } + } + dos.flush(); + dos.close(); bos.close(); byte[] data = bos.toByteArray(); return data; Index: src/test/org/apache/hama/io/TestBlockID.java =================================================================== --- src/test/org/apache/hama/io/TestBlockID.java (revision 728567) +++ src/test/org/apache/hama/io/TestBlockID.java (working copy) @@ -25,8 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.hbase.util.Bytes; public class TestBlockID extends TestCase { final static Log LOG = LogFactory.getLog(TestBlockID.class.getName()); @@ -33,36 +32,20 @@ /** * BlockID object compare + * @throws IOException */ - public void testCompare() { + public void testCompare() throws IOException { BlockID a = new BlockID(1, 3); BlockID b = new BlockID(1, 1); - assertEquals(a.compareTo(b), 1); - - BlockID c = new BlockID(3, 1); - BlockID d = new BlockID(1, 1); - assertEquals(a.compareTo(c), -1); - - assertEquals(b.compareTo(d), 0); - } - - /** - * BlockID object IO - * @throws IOException - */ - public void testIO() throws IOException { - DataOutputBuffer outBuf = new DataOutputBuffer(); - DataInputBuffer inBuf = new DataInputBuffer(); - BlockID a = new BlockID(1, 3); - outBuf.reset(); - a.write(outBuf); + byte[] c = Bytes.toBytes((a.toString())); + assertEquals(a.getRow(), 1); + assertEquals(a.getColumn(), 3); + assertEquals(b.getRow(), 1); + assertEquals(b.getColumn(), 1); - inBuf.reset(outBuf.getData(), outBuf.getLength()); - BlockID b = new BlockID(); - b.readFields(inBuf); - - assertEquals(0, a.compareTo(b)); + BlockID d = new BlockID(c); + assertEquals(d.getRow(), 1); + assertEquals(d.getColumn(), 3); } - } Index: src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java =================================================================== --- src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java (revision 728567) +++ src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java (working copy) @@ -21,15 +21,9 @@ import java.io.IOException; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; import org.apache.hama.DenseMatrix; import org.apache.hama.HCluster; import org.apache.hama.Matrix; -import org.apache.hama.algebra.BlockCyclicMultiplyMap; -import org.apache.hama.algebra.BlockCyclicMultiplyReduce; -import org.apache.hama.io.BlockID; -import org.apache.hama.io.BlockWritable; import org.apache.log4j.Logger; public class TestBlockMatrixMapReduce extends HCluster { @@ -34,7 +28,6 @@ public class TestBlockMatrixMapReduce extends HCluster { static final Logger LOG = Logger.getLogger(TestBlockMatrixMapReduce.class); - static Matrix c; static final int SIZE = 32; /** constructor */ @@ -50,8 +43,9 @@ ((DenseMatrix) m1).blocking_mapred(16); ((DenseMatrix) m2).blocking_mapred(16); - miniMRJob(m1.getPath(), m2.getPath()); - + Matrix c = m1.mult(m2); + assertEquals(c.getRows(), SIZE); + assertEquals(c.getColumns(), SIZE); double[][] C = new double[SIZE][SIZE]; for (int i = 0; i < SIZE; i++) { for (int j = 0; j < SIZE; j++) { @@ -68,23 +62,4 @@ } } } - - private void miniMRJob(String string, String string2) throws IOException { - c = new DenseMatrix(conf); - String output = c.getPath(); - - JobConf jobConf = new JobConf(conf, TestBlockMatrixMapReduce.class); - jobConf.setJobName("test MR job"); - - BlockCyclicMultiplyMap.initJob(string, string2, - BlockCyclicMultiplyMap.class, BlockID.class, BlockWritable.class, - jobConf); - BlockCyclicMultiplyReduce.initJob(output, BlockCyclicMultiplyReduce.class, - jobConf); - - jobConf.setNumMapTasks(2); - jobConf.setNumReduceTasks(2); - - JobClient.runJob(jobConf); - } }