Index: core/pom.xml =================================================================== --- core/pom.xml (revision 1546951) +++ core/pom.xml (working copy) @@ -225,6 +225,31 @@ + + org.codehaus.groovy.maven + gmaven-plugin + 1.0 + + + process-test-resources + execute + + + project.properties.setProperty('hama.pipes.examples.install', System.getProperty("hama.pipes.examples.install")); + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + ${hama.pipes.examples.install} + + + hama-core-${project.version} Index: core/src/test/java/org/apache/hama/pipes/TestPipes.java =================================================================== --- core/src/test/java/org/apache/hama/pipes/TestPipes.java (revision 0) +++ core/src/test/java/org/apache/hama/pipes/TestPipes.java (revision 0) @@ -0,0 +1,449 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.pipes; + +import java.io.BufferedReader; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.math.BigDecimal; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hama.Constants; +import org.apache.hama.HamaCluster; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.BSPJob; +import org.apache.hama.bsp.BSPJobClient; +import org.apache.hama.bsp.ClusterStatus; +import org.apache.hama.bsp.FileInputFormat; +import org.apache.hama.bsp.FileOutputFormat; +import org.apache.hama.bsp.KeyValueTextInputFormat; +import org.apache.hama.bsp.SequenceFileInputFormat; +import org.apache.hama.bsp.SequenceFileOutputFormat; +import org.apache.hama.bsp.message.MessageManager; +import org.apache.hama.bsp.message.queue.DiskQueue; +import org.apache.hama.commons.io.PipesKeyValueWritable; +import org.apache.hama.commons.io.PipesVectorWritable; +import org.apache.hama.commons.math.DenseDoubleVector; +import org.apache.hama.commons.math.DoubleVector; +import org.junit.Test; + +/** + * Test case for {@link PipesBSP} + * + */ +public class TestPipes extends HamaCluster { + private static final Log LOG = LogFactory.getLog(TestPipes.class); + + public static final String EXAMPLES_INSTALL_PROPERTY = "hama.pipes.examples.install"; + public static final String EXAMPLE_SUMMATION_EXEC = "/examples/summation"; + public static final String EXAMPLE_MATRIXMULTIPLICATION_EXEC = "/examples/matrixmultiplication"; + public static final String EXAMPLE_TMP_OUTPUT = "/tmp/test-example/"; + public static final String HAMA_TMP_OUTPUT = "/tmp/hama-test/"; + public static final String HAMA_TMP_DISK_QUEUE_OUTPUT = "/tmp/messageQueue"; + public static final int DOUBLE_PRECISION = 6; + + protected HamaConfiguration configuration; + + public TestPipes() { + configuration = new HamaConfiguration(); + + try { + // Cleanup temp Hama locations + FileSystem fs = FileSystem.get(configuration); + cleanup(fs, new Path(HAMA_TMP_OUTPUT)); + cleanup(fs, new Path(HAMA_TMP_DISK_QUEUE_OUTPUT)); + // Remove local temp folder + cleanup(fs, new Path(EXAMPLE_TMP_OUTPUT)); + } catch (IOException e) { + e.printStackTrace(); + } + + configuration.set("bsp.master.address", "localhost"); + configuration.set("hama.child.redirect.log.console", "true"); + assertEquals("Make sure master addr is set to localhost:", "localhost", + configuration.get("bsp.master.address")); + configuration.set("bsp.local.dir", HAMA_TMP_OUTPUT); + configuration + .set(DiskQueue.DISK_QUEUE_PATH_KEY, HAMA_TMP_DISK_QUEUE_OUTPUT); + configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost"); + configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810); + configuration.set("hama.sync.client.class", + org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class + .getCanonicalName()); + } + + @Override + public void setUp() throws Exception { + super.setUp(); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + } + + @Test + public void testPipes() throws Exception { + + assertNotNull("System property " + EXAMPLES_INSTALL_PROPERTY + + " is not defined!", System.getProperty(EXAMPLES_INSTALL_PROPERTY)); + + if (System.getProperty(EXAMPLES_INSTALL_PROPERTY).isEmpty()) { + LOG.error("System property " + EXAMPLES_INSTALL_PROPERTY + + " is empty! Skipping TestPipes!"); + return; + } + + LOG.info(EXAMPLES_INSTALL_PROPERTY + " is defined: '" + + System.getProperty(EXAMPLES_INSTALL_PROPERTY) + "'"); + + // Setup Paths + String examplesInstallPath = System.getProperty(EXAMPLES_INSTALL_PROPERTY); + Path summationExec = new Path(examplesInstallPath + EXAMPLE_SUMMATION_EXEC); + Path matrixmultiplicationExec = new Path(examplesInstallPath + + EXAMPLE_MATRIXMULTIPLICATION_EXEC); + Path inputPath = new Path(EXAMPLE_TMP_OUTPUT + "testing/in"); + Path outputPath = new Path(EXAMPLE_TMP_OUTPUT + "testing/out"); + + FileSystem fs = FileSystem.get(configuration); + + // *** Summation Test *** + // Generate Summation input + BigDecimal sum = writeSummationInputFile(fs, inputPath); + + // Run Summation example + runProgram(getSummationJob(configuration), summationExec, inputPath, + outputPath, 3, this.numOfGroom); + + // Verify output + verifySummationOutput(configuration, outputPath, sum); + + // Clean input and output folder + cleanup(fs, inputPath); + cleanup(fs, outputPath); + + // *** MatrixMultiplication Test *** + // Generate matrix dimensions + Random rand = new Random(); + // (0-19) + 11 -> between 11-30 + int rows = rand.nextInt(20) + 11; + int cols = rand.nextInt(20) + 11; + + // Generate MatrixMultiplication input + double[][] matrixA = createRandomMatrix(rows, cols, rand); + double[][] matrixB = createRandomMatrix(cols, rows, rand); + + Path matrixAPath = writeMatrix(configuration, matrixA, new Path(inputPath, + "matrixA.seq"), false); + Path transposedMatrixBPath = writeMatrix(configuration, matrixB, new Path( + inputPath, "transposedMatrixB.seq"), true); + + // Run MatrixMultiplication example + runProgram( + getMatrixMultiplicationJob(configuration, transposedMatrixBPath), + matrixmultiplicationExec, matrixAPath, outputPath, 3, this.numOfGroom); + + // Verify output + double[][] matrixC = multiplyMatrix(matrixA, matrixB); + verifyMatrixMultiplicationOutput(configuration, outputPath, matrixC); + + // Remove local temp folder + cleanup(fs, new Path(EXAMPLE_TMP_OUTPUT)); + } + + static BSPJob getSummationJob(HamaConfiguration conf) throws IOException { + BSPJob bsp = new BSPJob(conf); + bsp.setInputFormat(KeyValueTextInputFormat.class); + bsp.setInputKeyClass(Text.class); + bsp.setInputValueClass(Text.class); + bsp.setOutputFormat(SequenceFileOutputFormat.class); + bsp.setOutputKeyClass(Text.class); + bsp.setOutputValueClass(DoubleWritable.class); + bsp.set("bsp.message.class", DoubleWritable.class.getName()); + return bsp; + } + + static BSPJob getMatrixMultiplicationJob(HamaConfiguration conf, + Path transposedMatrixB) throws IOException { + BSPJob bsp = new BSPJob(conf); + bsp.setInputFormat(SequenceFileInputFormat.class); + bsp.setInputKeyClass(IntWritable.class); + bsp.setInputValueClass(PipesVectorWritable.class); + bsp.setOutputFormat(SequenceFileOutputFormat.class); + bsp.setOutputKeyClass(IntWritable.class); + bsp.setOutputValueClass(PipesVectorWritable.class); + bsp.set("bsp.message.class", PipesKeyValueWritable.class.getName()); + bsp.setPartitioner(PipesPartitioner.class); + // sort sent messages + bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS, + "org.apache.hama.bsp.message.queue.SortedMessageTransferProtocol"); + bsp.set("hama.mat.mult.B.path", transposedMatrixB.toString()); + return bsp; + } + + static BigDecimal writeSummationInputFile(FileSystem fs, Path dir) + throws IOException { + DataOutputStream out = fs.create(new Path(dir, "part0")); + Random rand = new Random(); + double rangeMin = 0; + double rangeMax = 100; + BigDecimal sum = new BigDecimal(0); + // loop between 50 and 149 times + for (int i = 0; i < rand.nextInt(100) + 50; i++) { + // generate key value pair inputs + double randomValue = rangeMin + (rangeMax - rangeMin) * rand.nextDouble(); + String truncatedValue = new BigDecimal(randomValue).setScale( + DOUBLE_PRECISION, BigDecimal.ROUND_DOWN).toString(); + + String line = "key" + (i + 1) + "\t" + truncatedValue + "\n"; + out.writeBytes(line); + + sum = sum.add(new BigDecimal(truncatedValue)); + LOG.info("input[" + i + "]: '" + line + "' sum: " + sum.toString()); + } + out.close(); + return sum; + } + + static double[][] createRandomMatrix(int rows, int columns, Random rand) { + LOG.info("createRandomMatrix rows: " + rows + " cols: " + columns); + final double[][] matrix = new double[rows][columns]; + double rangeMin = 0; + double rangeMax = 100; + + for (int i = 0; i < rows; i++) { + for (int j = 0; j < columns; j++) { + double randomValue = rangeMin + (rangeMax - rangeMin) + * rand.nextDouble(); + matrix[i][j] = new BigDecimal(randomValue).setScale(DOUBLE_PRECISION, + BigDecimal.ROUND_DOWN).doubleValue(); + // matrix[i][j] = rand.nextInt(9) + 1; + } + } + return matrix; + } + + static Path writeMatrix(Configuration conf, double[][] matrix, Path path, + boolean saveTransposed) { + // Write matrix to DFS + SequenceFile.Writer writer = null; + try { + FileSystem fs = FileSystem.get(conf); + writer = new SequenceFile.Writer(fs, conf, path, IntWritable.class, + PipesVectorWritable.class); + + // Transpose Matrix before saving + if (saveTransposed) { + int rows = matrix.length; + int columns = matrix[0].length; + double[][] transposed = new double[columns][rows]; + for (int i = 0; i < rows; i++) { + for (int j = 0; j < columns; j++) { + transposed[j][i] = matrix[i][j]; + } + } + matrix = transposed; + } + + LOG.info("writeRandomDistributedRowMatrix path: " + path + + " saveTransposed: " + saveTransposed); + for (int i = 0; i < matrix.length; i++) { + DenseDoubleVector rowVector = new DenseDoubleVector(matrix[i]); + writer.append(new IntWritable(i), new PipesVectorWritable(rowVector)); + LOG.info("IntWritable: " + i + " PipesVectorWritable: " + + rowVector.toString()); + } + + } catch (IOException e) { + e.printStackTrace(); + } finally { + if (writer != null) { + try { + writer.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + return path; + } + + static double[][] multiplyMatrix(double[][] matrixA, double[][] matrixB) { + final double[][] matrixC = new double[matrixA.length][matrixB[0].length]; + int m = matrixA.length; + int n = matrixA[0].length; + int p = matrixB[0].length; + + for (int k = 0; k < n; k++) { + for (int i = 0; i < m; i++) { + for (int j = 0; j < p; j++) { + matrixC[i][j] = matrixC[i][j] + matrixA[i][k] * matrixB[k][j]; + } + } + } + return matrixC; + } + + static void verifyOutput(HamaConfiguration conf, Path outputPath, + String[] expectedResults) throws IOException { + FileSystem fs = FileSystem.get(conf); + FileStatus[] listStatus = fs.listStatus(outputPath); + for (FileStatus status : listStatus) { + if (!status.isDir()) { + if (status.getLen() > 0) { + LOG.info("Output File: " + status.getPath()); + BufferedReader br = new BufferedReader(new InputStreamReader( + fs.open(status.getPath()))); + try { + String line = ""; + int i = 0; + while ((line = br.readLine()) != null) { + LOG.info("output[" + i + "]: '" + line + "'"); + LOG.info("expected[" + i + "]: '" + expectedResults[i] + "'"); + assertEquals("'" + expectedResults[i] + "' != '" + line + "'", + expectedResults[i], line); + i++; + } + } finally { + br.close(); + } + } + } + } + } + + static void verifySummationOutput(HamaConfiguration conf, Path outputPath, + BigDecimal sum) throws IOException { + FileSystem fs = FileSystem.get(conf); + FileStatus[] listStatus = fs.listStatus(outputPath); + for (FileStatus status : listStatus) { + if (!status.isDir()) { + SequenceFile.Reader reader = new SequenceFile.Reader(fs, + status.getPath(), conf); + Text key = new Text(); + DoubleWritable value = new DoubleWritable(); + if (reader.next(key, value)) { + LOG.info("Output File: " + status.getPath()); + LOG.info("key: '" + key + "' value: '" + value + "' expected: '" + + sum.doubleValue() + "'"); + assertEquals("Expected value: '" + sum + "' != '" + value + "'", + sum.doubleValue(), value.get(), + Math.pow(10, (DOUBLE_PRECISION * -1))); + } + reader.close(); + } + } + } + + static void verifyMatrixMultiplicationOutput(HamaConfiguration conf, + Path outputPath, double[][] matrix) throws IOException { + FileSystem fs = FileSystem.get(conf); + FileStatus[] listStatus = fs.listStatus(outputPath); + for (FileStatus status : listStatus) { + if (!status.isDir()) { + LOG.info("Output File: " + status.getPath()); + SequenceFile.Reader reader = new SequenceFile.Reader(fs, + status.getPath(), conf); + IntWritable key = new IntWritable(); + PipesVectorWritable value = new PipesVectorWritable(); + int rowIdx = 0; + while (reader.next(key, value)) { + + assertEquals("Expected rowIdx: '" + rowIdx + "' != '" + key.get() + + "'", rowIdx, key.get()); + + DoubleVector rowVector = value.getVector(); + LOG.info("key: " + key.get() + " value: " + rowVector.toString()); + + for (int colIdx = 0; colIdx < rowVector.getLength(); colIdx++) { + + double colValue = rowVector.get(colIdx); + + LOG.info("value[" + rowIdx + "," + colIdx + "]: " + colValue + + " expected: " + matrix[rowIdx][colIdx]); + + assertEquals("Expected colValue: '" + matrix[rowIdx][colIdx] + + "' != '" + colValue + "' in row: " + rowIdx + " values: " + + rowVector.toString(), matrix[rowIdx][colIdx], colValue, + Math.pow(10, (DOUBLE_PRECISION * -1))); + } + rowIdx++; + } + reader.close(); + } + } + } + + static void cleanup(FileSystem fs, Path p) throws IOException { + fs.delete(p, true); + assertFalse(p.getName() + " not cleaned up", fs.exists(p)); + } + + static void runProgram(BSPJob bsp, Path program, Path inputPath, + Path outputPath, int numBspTasks, int numOfGroom) throws IOException { + + HamaConfiguration conf = (HamaConfiguration) bsp.getConfiguration(); + bsp.setJobName("Test Hama Pipes " + program.getName()); + bsp.setBspClass(PipesBSP.class); + + FileInputFormat.setInputPaths(bsp, inputPath); + FileOutputFormat.setOutputPath(bsp, outputPath); + + Submitter.setIsJavaRecordReader(conf, true); + Submitter.setIsJavaRecordWriter(conf, true); + + BSPJobClient jobClient = new BSPJobClient(conf); + conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000); + + // Set bspTaskNum + ClusterStatus cluster = jobClient.getClusterStatus(false); + assertEquals(numOfGroom, cluster.getGroomServers()); + bsp.setNumBspTask(numBspTasks); + + // Copy binary to DFS + Path testExec = new Path(EXAMPLE_TMP_OUTPUT + "testing/bin/application"); + FileSystem fs = FileSystem.get(conf); + fs.delete(testExec.getParent(), true); + fs.copyFromLocalFile(program, testExec); + + // Set Executable + Submitter.setExecutable(conf, fs.makeQualified(testExec).toString()); + + // Run bspJob + Submitter.runJob(bsp); + + LOG.info("Client finishes execution job"); + + // check output + FileStatus[] listStatus = fs.listStatus(outputPath); + assertEquals(listStatus.length, numBspTasks); + } +} Index: c++/pom.xml =================================================================== --- c++/pom.xml (revision 1546951) +++ c++/pom.xml (working copy) @@ -42,6 +42,7 @@ org.apache.maven.plugins maven-antrun-plugin + 1.7 make @@ -61,6 +62,7 @@ + @@ -76,6 +78,7 @@ + true