Index: commons/src/main/java/org/apache/hama/commons/io/VectorWritable.java =================================================================== --- commons/src/main/java/org/apache/hama/commons/io/VectorWritable.java (revision 1548979) +++ commons/src/main/java/org/apache/hama/commons/io/VectorWritable.java (working copy) @@ -24,6 +24,7 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hama.commons.math.DenseDoubleVector; import org.apache.hama.commons.math.DoubleVector; +import org.apache.hama.commons.math.NamedDoubleVector; /** * Writable for dense vectors. @@ -102,6 +103,13 @@ for (int i = 0; i < vector.getDimension(); i++) { out.writeDouble(vector.get(i)); } + + if (vector.isNamed() && vector.getName() != null) { + out.writeBoolean(true); + out.writeUTF(vector.getName()); + } else { + out.writeBoolean(false); + } } public static DoubleVector readVector(DataInput in) throws IOException { @@ -111,6 +119,10 @@ for (int i = 0; i < length; i++) { vector.set(i, in.readDouble()); } + + if (in.readBoolean()) { + vector = new NamedDoubleVector(in.readUTF(), vector); + } return vector; } Index: commons/src/main/java/org/apache/hama/commons/math/DenseDoubleVector.java =================================================================== --- commons/src/main/java/org/apache/hama/commons/math/DenseDoubleVector.java (revision 1548979) +++ commons/src/main/java/org/apache/hama/commons/math/DenseDoubleVector.java (working copy) @@ -181,10 +181,6 @@ return newv; } - /* - * (non-Javadoc) - * @see de.jungblut.math.DoubleVector#subtract(de.jungblut.math.DoubleVector) - */ @Override public final DoubleVector subtractUnsafe(DoubleVector v) { DoubleVector newv = new DenseDoubleVector(v.getLength()); @@ -194,10 +190,6 @@ return newv; } - /* - * (non-Javadoc) - * @see de.jungblut.math.DoubleVector#subtract(double) - */ @Override public final DoubleVector subtract(double v) { DenseDoubleVector newv = new DenseDoubleVector(vector.length); Index: commons/src/main/java/org/apache/hama/commons/math/NamedDoubleVector.java =================================================================== --- commons/src/main/java/org/apache/hama/commons/math/NamedDoubleVector.java (revision 0) +++ commons/src/main/java/org/apache/hama/commons/math/NamedDoubleVector.java (working copy) @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.commons.math; + +import java.util.Iterator; + +public final class NamedDoubleVector implements DoubleVector { + + private final String name; + private final DoubleVector vector; + + public NamedDoubleVector(String name, DoubleVector deepCopy) { + super(); + this.name = name; + this.vector = deepCopy; + } + + @Override + public double get(int index) { + return vector.get(index); + } + + @Override + public int getLength() { + return vector.getLength(); + } + + @Override + public int getDimension() { + return vector.getDimension(); + } + + @Override + public void set(int index, double value) { + vector.set(index, value); + } + + @Override + @Deprecated + public DoubleVector apply(DoubleVectorFunction func) { + return vector.apply(func); + } + + @Override + @Deprecated + public DoubleVector apply(DoubleVector other, DoubleDoubleVectorFunction func) { + return vector.apply(other, func); + } + + @Override + public DoubleVector applyToElements(DoubleFunction func) { + return vector.applyToElements(func); + } + + @Override + public DoubleVector applyToElements(DoubleVector other, + DoubleDoubleFunction func) { + return vector.applyToElements(other, func); + } + + @Override + public DoubleVector addUnsafe(DoubleVector vector2) { + return vector.addUnsafe(vector2); + } + + @Override + public DoubleVector add(DoubleVector vector2) { + return vector.add(vector2); + } + + @Override + public DoubleVector add(double scalar) { + return vector.add(scalar); + } + + @Override + public DoubleVector subtractUnsafe(DoubleVector vector2) { + return vector.subtractUnsafe(vector2); + } + + @Override + public DoubleVector subtract(DoubleVector vector2) { + return vector.subtract(vector2); + } + + @Override + public DoubleVector subtract(double scalar) { + return vector.subtract(scalar); + } + + @Override + public DoubleVector subtractFrom(double scalar) { + return vector.subtractFrom(scalar); + } + + @Override + public DoubleVector multiply(double scalar) { + return vector.multiply(scalar); + } + + @Override + public DoubleVector multiplyUnsafe(DoubleVector vector2) { + return vector.multiplyUnsafe(vector2); + } + + @Override + public DoubleVector multiply(DoubleVector vector2) { + return vector.multiply(vector2); + } + + @Override + public DoubleVector multiply(DoubleMatrix matrix) { + return vector.multiply(matrix); + } + + @Override + public DoubleVector multiplyUnsafe(DoubleMatrix matrix) { + return vector.multiplyUnsafe(matrix); + } + + @Override + public DoubleVector divide(double scalar) { + return vector.divide(scalar); + } + + @Override + public DoubleVector divideFrom(double scalar) { + return vector.divideFrom(scalar); + } + + @Override + public DoubleVector pow(int x) { + return vector.pow(x); + } + + @Override + public DoubleVector abs() { + return vector.abs(); + } + + @Override + public DoubleVector sqrt() { + return vector.sqrt(); + } + + @Override + public double sum() { + return vector.sum(); + } + + @Override + public double dotUnsafe(DoubleVector vector2) { + return vector.dotUnsafe(vector2); + } + + @Override + public double dot(DoubleVector vector2) { + return vector.dot(vector2); + } + + @Override + public DoubleVector slice(int length) { + return vector.slice(length); + } + + @Override + public DoubleVector sliceUnsafe(int length) { + return vector.sliceUnsafe(length); + } + + @Override + public DoubleVector slice(int start, int end) { + return vector.slice(start, end); + } + + @Override + public DoubleVector sliceUnsafe(int start, int end) { + return vector.sliceUnsafe(start, end); + } + + @Override + public double max() { + return vector.max(); + } + + @Override + public double min() { + return vector.min(); + } + + @Override + public double[] toArray() { + return vector.toArray(); + } + + @Override + public DoubleVector deepCopy() { + return new NamedDoubleVector(name, vector.deepCopy()); + } + + @Override + public Iterator iterateNonZero() { + return vector.iterateNonZero(); + } + + @Override + public Iterator iterate() { + return vector.iterate(); + } + + @Override + public boolean isSparse() { + return vector.isSparse(); + } + + @Override + public boolean isNamed() { + return true; + } + + @Override + public String getName() { + return name; + } + + public String toString() { + return name + ": " + vector.toString(); + } + +} Index: examples/src/main/java/org/apache/hama/examples/Kmeans.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/Kmeans.java (revision 1548979) +++ examples/src/main/java/org/apache/hama/examples/Kmeans.java (working copy) @@ -86,7 +86,8 @@ // prepare the input, like deleting old versions and creating centers KMeansBSP.prepareInput(count, k, dimension, conf, in, center, out, fs); } else { - KMeansBSP.prepareInputText(k, conf, in, center, out, fs); + // Set the last argument to TRUE if first column is required to be the key + KMeansBSP.prepareInputText(k, conf, in, center, out, fs, true); in = new Path(in.getParent(), "textinput/in.seq"); } @@ -95,10 +96,9 @@ // just submit the job job.waitForCompletion(true); - List results = KMeansBSP.readOutput(conf, out, fs, 5); + List results = KMeansBSP.readOutput(conf, out, fs, 10); for (String line : results) { System.out.println(line); } - System.out.println("..."); } } Index: ml/src/main/java/org/apache/hama/ml/kmeans/KMeansBSP.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/kmeans/KMeansBSP.java (revision 1548979) +++ ml/src/main/java/org/apache/hama/ml/kmeans/KMeansBSP.java (working copy) @@ -44,6 +44,7 @@ import org.apache.hama.commons.io.VectorWritable; import org.apache.hama.commons.math.DenseDoubleVector; import org.apache.hama.commons.math.DoubleVector; +import org.apache.hama.commons.math.NamedDoubleVector; import org.apache.hama.ml.distance.DistanceMeasurer; import org.apache.hama.ml.distance.EuclidianDistance; import org.apache.hama.util.ReflectionUtils; @@ -80,7 +81,6 @@ public final void setup( BSPPeer peer) throws IOException, InterruptedException { - conf = peer.getConfiguration(); Path centroids = new Path(peer.getConfiguration().get(CENTER_IN_PATH)); @@ -195,6 +195,7 @@ // needs to be broadcasted. final DoubleVector[] newCenterArray = new DoubleVector[centers.length]; final int[] summationCount = new int[centers.length]; + // if our cache is not enabled, iterate over the disk items if (cache == null) { // we have an assignment step @@ -222,6 +223,7 @@ } } } + // now send messages about the local updates to each other peer for (int i = 0; i < newCenterArray.length; i++) { if (newCenterArray[i] != null) { @@ -237,6 +239,7 @@ final int[] summationCount, final DoubleVector key) { final int lowestDistantCenter = getNearestCenter(key); final DoubleVector clusterCenter = newCenterArray[lowestDistantCenter]; + if (clusterCenter == null) { newCenterArray[lowestDistantCenter] = key; } else { @@ -250,6 +253,7 @@ private int getNearestCenter(DoubleVector key) { int lowestDistantCenter = 0; double lowestDistance = Double.MAX_VALUE; + for (int i = 0; i < centers.length; i++) { final double estimatedDistance = distanceMeasurer.measureDistance( centers[i], key); @@ -419,9 +423,19 @@ /** * Reads input text files and writes it to a sequencefile. + * + * @param k + * @param conf + * @param txtIn + * @param center + * @param out + * @param fs + * @param hasKey true if first column is required to be the key. + * @return + * @throws IOException */ public static Path prepareInputText(int k, Configuration conf, Path txtIn, - Path center, Path out, FileSystem fs) throws IOException { + Path center, Path out, FileSystem fs, boolean hasKey) throws IOException { Path in; if (fs.isFile(txtIn)) { @@ -429,7 +443,7 @@ } else { in = new Path(txtIn, "textinput/in.seq"); } - + if (fs.exists(out)) fs.delete(out, true); @@ -454,11 +468,26 @@ String line; while ((line = br.readLine()) != null) { String[] split = line.split("\t"); - DenseDoubleVector vec = new DenseDoubleVector(split.length); - for (int j = 0; j < split.length; j++) { - vec.set(j, Double.parseDouble(split[j])); + int columnLength = split.length; + int indexPos = 0; + if (hasKey) { + columnLength = columnLength - 1; + indexPos++; + } + + DenseDoubleVector vec = new DenseDoubleVector(columnLength); + for (int j = 0; j < columnLength; j++) { + vec.set(j, Double.parseDouble(split[j + indexPos])); + } + + VectorWritable vector; + if (hasKey) { + NamedDoubleVector named = new NamedDoubleVector(split[0], vec); + vector = new VectorWritable(named); + } else { + vector = new VectorWritable(vec); } - VectorWritable vector = new VectorWritable(vec); + dataWriter.append(vector, value); if (k > i) { assert centerWriter != null; Index: ml/src/test/java/org/apache/hama/ml/kmeans/TestKMeansBSP.java =================================================================== --- ml/src/test/java/org/apache/hama/ml/kmeans/TestKMeansBSP.java (revision 1548979) +++ ml/src/test/java/org/apache/hama/ml/kmeans/TestKMeansBSP.java (working copy) @@ -63,7 +63,7 @@ bw.write(sb.toString()); bw.close(); - in = KMeansBSP.prepareInputText(k, conf, in, center, out, fs); + in = KMeansBSP.prepareInputText(k, conf, in, center, out, fs, false); BSPJob job = KMeansBSP.createJob(conf, in, out, true);