Index: ml/src/main/java/org/apache/hama/ml/kmeans/KMeansBSP.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/kmeans/KMeansBSP.java (revision 1555510) +++ ml/src/main/java/org/apache/hama/ml/kmeans/KMeansBSP.java (working copy) @@ -244,8 +244,8 @@ // add the vector to the center newCenterArray[lowestDistantCenter] = newCenterArray[lowestDistantCenter] .addUnsafe(key); - summationCount[lowestDistantCenter]++; } + summationCount[lowestDistantCenter]++; } private int getNearestCenter(DoubleVector key) { @@ -514,7 +514,7 @@ fs.delete(out, true); if (fs.exists(center)) - fs.delete(out, true); + fs.delete(center, true); if (fs.exists(in)) fs.delete(in, true); Index: ml/src/test/java/org/apache/hama/ml/kmeans/TestKMeansBSP.java =================================================================== --- ml/src/test/java/org/apache/hama/ml/kmeans/TestKMeansBSP.java (revision 1555510) +++ ml/src/test/java/org/apache/hama/ml/kmeans/TestKMeansBSP.java (working copy) @@ -18,70 +18,147 @@ package org.apache.hama.ml.kmeans; import java.io.BufferedWriter; +import java.io.IOException; import java.io.OutputStreamWriter; import java.util.HashMap; import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.BSPJob; import org.apache.hama.commons.math.DoubleVector; public class TestKMeansBSP extends TestCase { + public static final String TMP_OUTPUT = "/tmp/clustering/"; public void testRunJob() throws Exception { - Configuration conf = new Configuration(); - Path in = new Path("/tmp/clustering/in/in.txt"); - Path out = new Path("/tmp/clustering/out/"); + + Configuration conf = new HamaConfiguration(); FileSystem fs = FileSystem.get(conf); - Path center = null; + // Test with numBspTask = 1 try { - center = new Path(in.getParent(), "center/cen.seq"); + test(conf, fs, 1); + } finally { + fs.delete(new Path(TMP_OUTPUT), true); + } - Path centerOut = new Path(out, "center/center_output.seq"); - conf.set(KMeansBSP.CENTER_IN_PATH, center.toString()); - conf.set(KMeansBSP.CENTER_OUT_PATH, centerOut.toString()); - int iterations = 10; - conf.setInt(KMeansBSP.MAX_ITERATIONS_KEY, iterations); - int k = 1; + // Test with numBspTask = 2 + try { + test(conf, fs, 2); + } finally { + fs.delete(new Path(TMP_OUTPUT), true); + } - FSDataOutputStream create = fs.create(in); - BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(create)); + // Test with numBspTask = 3 + try { + test(conf, fs, 3); + } finally { + fs.delete(new Path(TMP_OUTPUT), true); + } + } + + /** + * Test + * + * Create 101 input vectors of dimension two + * + * Input vectors: (0,0) (1,1) (2,2) ... (100,100) + * + * k = 1, maxIterations = 10 + * + * Resulting center should be (50,50) + */ + private void test(Configuration conf, FileSystem fs, int numBspTask) + throws IOException, InterruptedException, ClassNotFoundException { + + Path in = new Path(TMP_OUTPUT + "in"); + Path out = new Path(TMP_OUTPUT + "out"); + Path centerIn = new Path(TMP_OUTPUT + "center/center_input.seq"); + Path centerOut = new Path(TMP_OUTPUT + "center/center_output.seq"); + conf.set(KMeansBSP.CENTER_IN_PATH, centerIn.toString()); + conf.set(KMeansBSP.CENTER_OUT_PATH, centerOut.toString()); + + int k = 1; + int iterations = 10; + conf.setInt(KMeansBSP.MAX_ITERATIONS_KEY, iterations); + + in = generateInputText(k, conf, fs, in, centerIn, out, numBspTask); + + BSPJob job = KMeansBSP.createJob(conf, in, out, true); + job.setNumBspTask(numBspTask); + + // just submit the job + boolean result = job.waitForCompletion(true); + + assertEquals(true, result); + + HashMap centerMap = KMeansBSP.readClusterCenters( + conf, out, centerOut, fs); + System.out.println(centerMap); + + assertEquals(1, centerMap.size()); // because k = 1 + + DoubleVector doubleVector = centerMap.get(0); + assertEquals(Double.valueOf(50), doubleVector.get(0)); + assertEquals(Double.valueOf(50), doubleVector.get(1)); + } + + private Path generateInputText(int k, Configuration conf, FileSystem fs, + Path in, Path centerIn, Path out, int numBspTask) throws IOException { + + int maxInputs = 101; // 0..100 + int partInputs = maxInputs / numBspTask; + Path parts = new Path(in, "parts"); + + for (int part = 0; part < numBspTask; part++) { + Path partIn = new Path(parts, "part" + part + "/input.txt"); + BufferedWriter bw = new BufferedWriter(new OutputStreamWriter( + fs.create(partIn))); StringBuilder sb = new StringBuilder(); - - for (int i = 0; i < 100; i++) { - sb.append(i); + for (int i = 0; i < partInputs; i++) { + sb.append((partInputs * part) + i); sb.append('\t'); - sb.append(i); + sb.append((partInputs * part) + i); sb.append('\n'); } + // if last part write out missing inputs + if ((numBspTask > 1) && (part + 1 == numBspTask)) { + for (int i = (partInputs * part + 1); i < maxInputs; i++) { + sb.append(i); + sb.append('\t'); + sb.append(i); + sb.append('\n'); + } + } + + // Debug output + System.out.println("part: " + partIn.toString()); + System.out.println(sb.toString()); + bw.write(sb.toString()); bw.close(); - in = KMeansBSP.prepareInputText(k, conf, in, center, out, fs, false); + // Convert input text to SequenceFiles + Path seqFile = null; + if (part == 0) { + seqFile = KMeansBSP.prepareInputText(k, conf, partIn, centerIn, out, + fs, false); + } else { + seqFile = KMeansBSP.prepareInputText(0, conf, partIn, new Path(centerIn + + "_empty.seq"), out, fs, false); + } - BSPJob job = KMeansBSP.createJob(conf, in, out, true); + fs.moveFromLocalFile(seqFile, new Path(parts, "part" + part + ".seq")); + fs.delete(seqFile.getParent(), true); + fs.delete(partIn.getParent(), true); + } - // just submit the job - boolean result = job.waitForCompletion(true); - - assertEquals(true, result); - - HashMap centerMap = KMeansBSP.readClusterCenters( - conf, out, centerOut, fs); - System.out.println(centerMap); - assertEquals(1, centerMap.size()); - DoubleVector doubleVector = centerMap.get(0); - assertTrue(doubleVector.get(0) >= 50 && doubleVector.get(0) < 51); - assertTrue(doubleVector.get(1) >= 50 && doubleVector.get(1) < 51); - } finally { - fs.delete(new Path("/tmp/clustering"), true); - } + return parts; } } Index: examples/src/main/java/org/apache/hama/examples/Kmeans.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/Kmeans.java (revision 1555510) +++ examples/src/main/java/org/apache/hama/examples/Kmeans.java (working copy) @@ -67,10 +67,12 @@ Path out = new Path(args[1]); FileSystem fs = FileSystem.get(conf); Path center = null; - if (fs.isFile(in)) + if (fs.isFile(in)) { center = new Path(in.getParent(), "center/cen.seq"); - else + } else { center = new Path(in, "center/cen.seq"); + in = new Path(in, "input.seq"); + } Path centerOut = new Path(out, "center/center_output.seq"); conf.set(KMeansBSP.CENTER_IN_PATH, center.toString()); conf.set(KMeansBSP.CENTER_OUT_PATH, centerOut.toString());