diff --git README.md README.md
new file mode 100644
index 0000000..a6d4ead
--- /dev/null
+++ README.md
@@ -0,0 +1,13 @@
+###**This is not the original Apache Hama!**
+This is just an experimental copy that is forked from the orignal Apache Hama from https://github.com/apache/hama.
+This copy is used to implement the multi layer perception (in other words, the artificial neural networks).
+
+
+For the latest information about Hama, please visit our website at:
+
+ http://hama.apache.org/
+
+and our wiki, at:
+
+ http://wiki.apache.org/hama/
+
diff --git README.txt README.txt
deleted file mode 100644
index 03f82bb..0000000
--- README.txt
+++ /dev/null
@@ -1,8 +0,0 @@
-For the latest information about Hama, please visit our website at:
-
- http://hama.apache.org/
-
-and our wiki, at:
-
- http://wiki.apache.org/hama/
-
diff --git ml/src/main/java/org/apache/hama/ml/perception/CostFunction.java ml/src/main/java/org/apache/hama/ml/perception/CostFunction.java
new file mode 100644
index 0000000..eeb703a
--- /dev/null
+++ ml/src/main/java/org/apache/hama/ml/perception/CostFunction.java
@@ -0,0 +1,25 @@
+package org.apache.hama.ml.perception;
+
+/**
+ * Teh common interface for cost functions.
+ *
+ */
+public abstract class CostFunction {
+
+ /**
+ * Get the error evaluated by squared error.
+ * @param target The target value.
+ * @param actual The actual value.
+ * @return
+ */
+ public abstract double getCost(double target, double actual);
+
+ /**
+ * Get the partial derivative of squared error.
+ * @param target
+ * @param actual
+ * @return
+ */
+ public abstract double getPartialDerivative(double target, double actual);
+
+}
diff --git ml/src/main/java/org/apache/hama/ml/perception/CostFunctionFactory.java ml/src/main/java/org/apache/hama/ml/perception/CostFunctionFactory.java
new file mode 100644
index 0000000..6147b66
--- /dev/null
+++ ml/src/main/java/org/apache/hama/ml/perception/CostFunctionFactory.java
@@ -0,0 +1,25 @@
+package org.apache.hama.ml.perception;
+
+/**
+ * The cost function factory that generates the cost function
+ * by name.
+ */
+public class CostFunctionFactory {
+
+ /**
+ * Get the cost function according to the name.
+ * If no matched cost function is found, return the
+ * SquaredError by default.
+ * @param name The name of the cost function.
+ * @return The cost function instance.
+ */
+ public static CostFunction getCostFunction(String name) {
+ if (name.equalsIgnoreCase("SquaredError")) {
+ return new SquaredError();
+ }
+ else if (name.equalsIgnoreCase("LogisticError")) {
+ return new LogisticCostFunction();
+ }
+ return new SquaredError();
+ }
+}
diff --git ml/src/main/java/org/apache/hama/ml/perception/LogisticCostFunction.java ml/src/main/java/org/apache/hama/ml/perception/LogisticCostFunction.java
new file mode 100644
index 0000000..e4eee5a
--- /dev/null
+++ ml/src/main/java/org/apache/hama/ml/perception/LogisticCostFunction.java
@@ -0,0 +1,35 @@
+package org.apache.hama.ml.perception;
+
+/**
+ * The logistic cost function.
+ *
+ *
+ * cost(t, y) = - t * log(y) - (1 - t) * log(1 - y),
+ * where t denotes the target value, y denotes the estimated value.
+ *
+ */
+public class LogisticCostFunction extends CostFunction {
+
+ @Override
+ public double getCost(double target, double actual) {
+ return -target * Math.log(actual) - (1 - target) * Math.log(1 - actual);
+ }
+
+ @Override
+ public double getPartialDerivative(double target, double actual) {
+ if (actual == 1) {
+ actual = 0.999;
+ }
+ else if (actual == 0) {
+ actual = 0.001;
+ }
+ if (target == 1) {
+ target = 0.999;
+ }
+ else if (target == 0) {
+ target = 0.001;
+ }
+ return -target / actual + (1 - target) / (1 - actual);
+ }
+
+}
diff --git ml/src/main/java/org/apache/hama/ml/perception/MLPMessage.java ml/src/main/java/org/apache/hama/ml/perception/MLPMessage.java
new file mode 100644
index 0000000..1aa2576
--- /dev/null
+++ ml/src/main/java/org/apache/hama/ml/perception/MLPMessage.java
@@ -0,0 +1,26 @@
+package org.apache.hama.ml.perception;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * MLPMessage is used to hold the parameters that needs
+ * to be sent between the tasks.
+ */
+public abstract class MLPMessage implements Writable {
+ protected boolean terminated;
+
+ public MLPMessage(boolean terminated) {
+ setTerminated(terminated);
+ }
+
+ public void setTerminated(boolean terminated) {
+ this.terminated = terminated;
+ }
+
+ public boolean isTerminated() {
+ return terminated;
+ }
+
+
+
+}
diff --git ml/src/main/java/org/apache/hama/ml/perception/MultiLayerPerceptron.java ml/src/main/java/org/apache/hama/ml/perception/MultiLayerPerceptron.java
new file mode 100644
index 0000000..e4323ea
--- /dev/null
+++ ml/src/main/java/org/apache/hama/ml/perception/MultiLayerPerceptron.java
@@ -0,0 +1,128 @@
+package org.apache.hama.ml.perception;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.ml.math.DoubleVector;
+
+/**
+ * PerceptronBase defines the common behavior of all the concrete perceptrons.
+ */
+public abstract class MultiLayerPerceptron {
+
+ /* The trainer for the model */
+ protected PerceptronTrainer trainer;
+ /* The file path that contains the model meta-data */
+ protected String modelPath;
+
+ /* Model meta-data */
+ protected String MLPType;
+ protected double learningRate;
+ protected boolean regularization;
+ protected double momentum;
+ protected int numberOfLayers;
+ protected String squashingFunctionName;
+ protected String costFunctionName;
+ protected int[] layerSizeArray;
+
+ protected CostFunction costFunction;
+ protected SquashingFunction squashingFunction;
+
+ /**
+ * Initialize the MLP.
+ * @param learningRate Larger learningRate makes MLP learn more aggressive.
+ * @param regularization Turn on regularization make MLP less likely to overfit.
+ * @param momentum The momentum makes the historical adjust have affect to current adjust.
+ * @param squashingFunctionName The name of squashing function.
+ * @param costFunctionName The name of the cost function.
+ * @param layerSizeArray The number of neurons for each layer. Note that the actual size of each layer is one more than the input size.
+ */
+ public MultiLayerPerceptron(double learningRate, boolean regularization, double momentum,
+ String squashingFunctionName, String costFunctionName, int[] layerSizeArray) {
+ this.learningRate = learningRate;
+ this.regularization = regularization; // no regularization
+ this.momentum = momentum; // no momentum
+ this.squashingFunctionName = squashingFunctionName;
+ this.costFunctionName = costFunctionName;
+ this.layerSizeArray = layerSizeArray;
+ this.numberOfLayers = this.layerSizeArray.length;
+
+ this.costFunction = CostFunctionFactory.getCostFunction(this.costFunctionName);
+ this.squashingFunction = SquashingFunctionFactory.getSquashingFunction(this.squashingFunctionName);
+ }
+
+ /**
+ * Initialize a multi-layer perceptron with existing model.
+ * @param modelPath Location of existing model meta-data.
+ */
+ public MultiLayerPerceptron(String modelPath) {
+ this.modelPath = modelPath;
+ }
+
+ /**
+ * Train the model with given data.
+ * This method invokes a perceptron training BSP task to train the model.
+ * It then write the model to modelPath.
+ * @param dataInputPath The path of the data.
+ * @param trainingParams Extra parameters for training.
+ */
+ public abstract void train(Path dataInputPath, Map trainingParams) throws Exception;
+
+ /**
+ * Get the output based on the input instance and the learned model.
+ * @param featureVector The feature of an instance to feed the perceptron.
+ * @return The results.
+ */
+ public abstract DoubleVector output(DoubleVector featureVector) throws Exception;
+
+ /**
+ * Read the model meta-data from the specified location.
+ * @throws IOException
+ */
+ protected abstract void readFromModel() throws IOException;
+
+ /**
+ * Write the model data to specified location.
+ * @param modelPath The location in file system to store the model.
+ * @throws IOException
+ */
+ public abstract void writeModelToFile(String modelPath) throws IOException;
+
+ public String getModelPath() {
+ return modelPath;
+ }
+
+ public String getMLPType() {
+ return MLPType;
+ }
+
+ public double getLearningRate() {
+ return learningRate;
+ }
+
+ public boolean isRegularization() {
+ return regularization;
+ }
+
+ public double getMomentum() {
+ return momentum;
+ }
+
+ public int getNumberOfLayers() {
+ return numberOfLayers;
+ }
+
+ public String getSquashingFunctionName() {
+ return squashingFunctionName;
+ }
+
+ public String getCostFunctionName() {
+ return costFunctionName;
+ }
+
+ public int[] getLayerSizeArray() {
+ return layerSizeArray;
+ }
+
+}
diff --git ml/src/main/java/org/apache/hama/ml/perception/PerceptronTrainer.java ml/src/main/java/org/apache/hama/ml/perception/PerceptronTrainer.java
new file mode 100644
index 0000000..2e55647
--- /dev/null
+++ ml/src/main/java/org/apache/hama/ml/perception/PerceptronTrainer.java
@@ -0,0 +1,73 @@
+package org.apache.hama.ml.perception;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.ml.writable.VectorWritable;
+
+/**
+ * The trainer that is used to train the perceptron with BSP.
+ * The trainer would read the training data and obtain the trained parameters of the model.
+ *
+ */
+public abstract class PerceptronTrainer
+ extends BSP {
+
+ protected Configuration conf;
+ protected int maxIteration;
+ protected int batchSize;
+ protected String trainingMode;
+
+ @Override
+ public void setup(BSPPeer peer)
+ throws IOException, SyncException, InterruptedException {
+ conf = peer.getConfiguration();
+ trainingMode = conf.get("training.mode");
+ batchSize = conf.getInt("training.batch.size", 100); // mini-batch by default
+ this.extraSetup(peer);
+ }
+
+ /**
+ * Handle extra setup for sub-classes.
+ * @param peer
+ * @throws IOException
+ * @throws SyncException
+ * @throws InterruptedException
+ */
+ protected void extraSetup(
+ BSPPeer peer)
+ throws IOException, SyncException, InterruptedException {
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public abstract void bsp(BSPPeer peer)
+ throws IOException, SyncException, InterruptedException;
+
+ @Override
+ public void cleanup(BSPPeer peer)
+ throws IOException {
+
+ this.extraCleanup(peer);
+ }
+
+ /**
+ * Handle extra cleanup for sub-classes.
+ * @param peer
+ * @throws IOException
+ * @throws SyncException
+ * @throws InterruptedException
+ */
+ protected void extraCleanup(
+ BSPPeer peer)
+ throws IOException {
+ }
+
+}
diff --git ml/src/main/java/org/apache/hama/ml/perception/Sigmoid.java ml/src/main/java/org/apache/hama/ml/perception/Sigmoid.java
new file mode 100644
index 0000000..7bda8a6
--- /dev/null
+++ ml/src/main/java/org/apache/hama/ml/perception/Sigmoid.java
@@ -0,0 +1,22 @@
+package org.apache.hama.ml.perception;
+
+import org.apache.hama.ml.math.DoubleVector;
+
+/**
+ * The Sigmoid function
+ *
+ * f(z) = 1 / (1 + e^{-z})
+ *
+ */
+public class Sigmoid extends SquashingFunction {
+
+ @Override
+ public double calculate(int index, double value) {
+ return 1.0 / (1 + Math.exp(-value));
+ }
+
+ @Override
+ public double getDerivative(double value) {
+ return value * (1 - value);
+ }
+}
diff --git ml/src/main/java/org/apache/hama/ml/perception/SmallMLPMessage.java ml/src/main/java/org/apache/hama/ml/perception/SmallMLPMessage.java
new file mode 100644
index 0000000..354d9ad
--- /dev/null
+++ ml/src/main/java/org/apache/hama/ml/perception/SmallMLPMessage.java
@@ -0,0 +1,68 @@
+package org.apache.hama.ml.perception;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hama.ml.math.DenseDoubleMatrix;
+import org.apache.hama.ml.math.DoubleMatrix;
+import org.apache.hama.ml.writable.MatrixWritable;
+
+/**
+ * SmallMLPMessage is used to exchange information for the
+ * {@link SmallMultiLayerPerceptron}.
+ * It send the whole parameter matrix from one task to another.
+ */
+public class SmallMLPMessage extends MLPMessage {
+
+ private int owner; // the ID of the task who creates the message
+ private DenseDoubleMatrix[] weightUpdatedMatrices;
+ private int numOfMatrices;
+
+ public SmallMLPMessage(int owner, boolean terminated, DenseDoubleMatrix[] mat) {
+ super(terminated);
+ this.owner = owner;
+ this.weightUpdatedMatrices = mat;
+ this.numOfMatrices = this.weightUpdatedMatrices == null? 0 : this.weightUpdatedMatrices.length;
+ }
+
+ /**
+ * Get the owner task Id of the message.
+ * @return
+ */
+ public int getOwner() {
+ return owner;
+ }
+
+ /**
+ * Get the updated weight matrices.
+ * @return
+ */
+ public DenseDoubleMatrix[] getWeightsUpdatedMatrices() {
+ return this.weightUpdatedMatrices;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ this.owner = input.readInt();
+ this.terminated = input.readBoolean();
+ this.numOfMatrices = input.readInt();
+ this.weightUpdatedMatrices = new DenseDoubleMatrix[this.numOfMatrices];
+ for (int i = 0; i < this.numOfMatrices; ++i) {
+ this.weightUpdatedMatrices[i] = (DenseDoubleMatrix)MatrixWritable.read(input);
+ }
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.writeInt(this.owner);
+ output.writeBoolean(this.terminated);
+ output.writeInt(this.numOfMatrices);
+ for (int i = 0; i < this.numOfMatrices; ++i) {
+ MatrixWritable.write(this.weightUpdatedMatrices[i], output);
+ }
+ }
+
+}
diff --git ml/src/main/java/org/apache/hama/ml/perception/SmallMLPTrainer.java ml/src/main/java/org/apache/hama/ml/perception/SmallMLPTrainer.java
new file mode 100644
index 0000000..9fb405d
--- /dev/null
+++ ml/src/main/java/org/apache/hama/ml/perception/SmallMLPTrainer.java
@@ -0,0 +1,291 @@
+package org.apache.hama.ml.perception;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.ml.math.DenseDoubleMatrix;
+import org.apache.hama.ml.writable.VectorWritable;
+
+/**
+ * The perceptron trainer for small scale MLP.
+ */
+public class SmallMLPTrainer extends PerceptronTrainer {
+
+ private static final Log LOG = LogFactory.getLog(SmallMLPTrainer.class);
+ /* used by master only, check whether all slaves finishes reading */
+ private BitSet statusSet;
+
+ private int numTrainingInstanceRead = 0;
+ /* Once reader reaches the EOF, the training procedure would be terminated */
+ private boolean terminateTraining = false;
+
+ private SmallMultiLayerPerceptron inMemoryPerceptron;
+
+ private int[] layerSizeArray;
+
+
+ @Override
+ protected void extraSetup(
+ BSPPeer peer) {
+
+ this.statusSet = new BitSet(peer.getConfiguration().getInt("tasks", 1));
+
+ String outputModelPath = conf.get("modelPath");
+ if (outputModelPath == null || outputModelPath.trim().length() == 0) {
+ try {
+ throw new Exception("Please specify output model path.");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ String modelPath = conf.get("existingModelPath");
+ if (modelPath == null || modelPath.trim().length() == 0) { // build model from scratch
+ String MLPType = conf.get("MLPType");
+ double learningRate = Double.parseDouble(conf.get("learningRate"));
+ boolean regularization = Boolean.parseBoolean(conf.get("regularization"));
+ double momentum = Double.parseDouble(conf.get("momentum"));
+ String squashingFunctionName = conf.get("squashingFunctionName");
+ String costFunctionName = conf.get("costFunctionName");
+ String[] layerSizeArrayStr = conf.get("layerSizeArray").trim().split(" ");
+ this.layerSizeArray = new int[layerSizeArrayStr.length];
+ for (int i = 0; i < this.layerSizeArray.length; ++i) {
+ this.layerSizeArray[i] = Integer.parseInt(layerSizeArrayStr[i]);
+ }
+
+ this.inMemoryPerceptron = new SmallMultiLayerPerceptron(learningRate, regularization, momentum,
+ squashingFunctionName, costFunctionName, layerSizeArray);
+ LOG.info("Training model from scratch.");
+ }
+ else { // read model from existing data
+ this.inMemoryPerceptron = new SmallMultiLayerPerceptron(modelPath);
+ LOG.info("Training with existing model.");
+ }
+
+ }
+
+ @Override
+ protected void extraCleanup(
+ BSPPeer peer
+ ) {
+ LOG.info(String.format("Task %d totally read %d records.\n", peer.getPeerIndex(), this.numTrainingInstanceRead));
+ // master write learned model to disk
+ if (peer.getPeerIndex() == 0) {
+ try {
+ LOG.info(String.format("Master write learned model to %s\n", conf.get("modelPath")));
+ this.inMemoryPerceptron.writeModelToFile(conf.get("modelPath"));
+ } catch (IOException e) {
+ System.err.println("Please set a correct model path.");
+ }
+ }
+ }
+
+ @Override
+ public void bsp(BSPPeer peer)
+ throws IOException, SyncException, InterruptedException {
+ // TODO Auto-generated method stub
+ LOG.info("Start training...");
+ if (trainingMode.equalsIgnoreCase("minibatch.gradient.descent")) {
+ LOG.info("Training Mode: minibatch.gradient.descent");
+ trainByMinibatch(peer);
+ }
+
+ LOG.info("Finished.");
+ }
+
+ /**
+ * Train the MLP with stochastic gradient descent.
+ * @param peer
+ * @throws IOException
+ * @throws SyncException
+ * @throws InterruptedException
+ */
+ private void trainByMinibatch(
+ BSPPeer peer)
+ throws IOException, SyncException, InterruptedException {
+
+ int maxIteration = conf.getInt("training.iteration", 1);
+ LOG.info("# of Training Iteration: " + maxIteration);
+
+ for (int i = 0; i < maxIteration; ++i) {
+ LOG.info(String.format("Iteration [%d] begins...", i));
+ peer.reopenInput();
+ // reset status
+ if (peer.getPeerIndex() == 0) {
+ this.statusSet = new BitSet(peer.getConfiguration().getInt("tasks", 1));
+ }
+ this.terminateTraining = false;
+ peer.sync();
+ while (true) {
+ // each slate task updates weights according to training data
+ boolean terminate = updateWeights(peer);
+ peer.sync();
+
+ // master merges the updates
+ if (peer.getPeerIndex() == 0) {
+ mergeUpdate(peer);
+ }
+ peer.sync();
+
+ if (terminate) {
+ break;
+ }
+ }
+
+ }
+
+ }
+
+ /**
+ * Merge the updates from slaves task.
+ * @param peer
+ * @throws IOException
+ */
+ private void mergeUpdate(
+ BSPPeer peer)
+ throws IOException {
+ // initialize the cache
+ DenseDoubleMatrix[] mergedUpdates = this.getZeroWeightMatrices();
+
+ int numOfPartitions = peer.getNumCurrentMessages();
+
+ // aggregates the weights update
+ while (peer.getNumCurrentMessages() > 0) {
+ SmallMLPMessage message = (SmallMLPMessage)peer.getCurrentMessage();
+ if (message.isTerminated()) {
+ this.statusSet.set(message.getOwner());
+ }
+
+ DenseDoubleMatrix[] weightUpdates = message.getWeightsUpdatedMatrices();
+ for (int m = 0; m < mergedUpdates.length; ++m) {
+ mergedUpdates[m] = (DenseDoubleMatrix)mergedUpdates[m].add(weightUpdates[m]);
+ }
+ }
+
+ if (numOfPartitions != 0) {
+ // calculate the global mean (the mean of batches from all slave tasks) of the weight updates
+ for (int m = 0; m < mergedUpdates.length; ++m) {
+ mergedUpdates[m] = (DenseDoubleMatrix)mergedUpdates[m].divide(numOfPartitions);
+ }
+
+ // check if all tasks finishes reading data
+ if (this.statusSet.cardinality() == conf.getInt("tasks", 1)) {
+ this.terminateTraining = true;
+ }
+
+ // update the weight matrices
+ this.inMemoryPerceptron.updateWeightMatrices(mergedUpdates);
+ }
+
+ // broadcast updated weight matrices
+ for (String peerName : peer.getAllPeerNames()) {
+ SmallMLPMessage msg = new SmallMLPMessage(peer.getPeerIndex(), this.terminateTraining,
+ this.inMemoryPerceptron.getWeightMatrices());
+ peer.send(peerName, msg);
+ }
+ LOG.info("Master: Broadcast updated weight matrix finishes.");
+
+ }
+
+ /**
+ * Train the MLP with training data.
+ * @param peer
+ * @return Whether terminates.
+ * @throws IOException
+ */
+ private boolean updateWeights(
+ BSPPeer peer)
+ throws IOException {
+ // receive update message sent by master
+ if (peer.getNumCurrentMessages() > 0) {
+ SmallMLPMessage message = (SmallMLPMessage)peer.getCurrentMessage();
+ this.terminateTraining = message.isTerminated();
+ // each slave renew its weight matrices
+ this.inMemoryPerceptron.setWeightMatrices(message.getWeightsUpdatedMatrices());
+ if (this.terminateTraining) {
+ return true;
+ }
+ }
+
+ // update weight according to training data
+ DenseDoubleMatrix[] weightUpdates = this.getZeroWeightMatrices();
+
+ int count = 0;
+ LongWritable recordId = new LongWritable();
+ VectorWritable trainingInstance = new VectorWritable();
+ boolean hasMore = false;
+ while (count++ < this.batchSize) {
+ hasMore = peer.readNext(recordId, trainingInstance);
+
+ try {
+ DenseDoubleMatrix[] singleTrainingInstanceUpdates = this.inMemoryPerceptron.trainByInstance(trainingInstance.getVector());
+ // aggregate the updates
+ for (int m = 0; m < weightUpdates.length; ++m) {
+ weightUpdates[m] = (DenseDoubleMatrix)weightUpdates[m].add(singleTrainingInstanceUpdates[m]);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ ++numTrainingInstanceRead;
+ if (!hasMore) {
+ break;
+ }
+ }
+
+ // calculate the local mean (the mean of the local batch) of weight updates
+ for (int m = 0; m < weightUpdates.length; ++m) {
+ weightUpdates[m] = (DenseDoubleMatrix)weightUpdates[m].divide(count);
+ }
+
+ LOG.info(String.format("Task %d has read %d records.\n", peer.getPeerIndex(), this.numTrainingInstanceRead));
+
+ // send the weight updates to master task
+ SmallMLPMessage message = new SmallMLPMessage(peer.getPeerIndex(), !hasMore, weightUpdates);
+ peer.send(peer.getPeerName(0), message); // send status to master
+
+ return !hasMore;
+ }
+
+ /**
+ * Initialize the weight matrices.
+ */
+ private DenseDoubleMatrix[] getZeroWeightMatrices() {
+ DenseDoubleMatrix[] weightUpdateCache = new DenseDoubleMatrix[this.layerSizeArray.length - 1];
+ // initialize weight matrix each layer
+ for (int i = 0; i < weightUpdateCache.length; ++i) {
+ weightUpdateCache[i] = new DenseDoubleMatrix(this.layerSizeArray[i] + 1, this.layerSizeArray[i + 1]);
+ }
+ return weightUpdateCache;
+ }
+
+ /**
+ * Print out the weights.
+ * @param mat
+ * @return
+ */
+ private static String weightsToString(DenseDoubleMatrix[] mat) {
+ StringBuilder sb = new StringBuilder();
+
+ for (int i = 0; i < mat.length; ++i) {
+ sb.append(String.format("Matrix [%d]\n", i));
+ double[][] values = mat[i].getValues();
+ for (int d = 0; d < values.length; ++d) {
+ sb.append(Arrays.toString(values[d]));
+ sb.append('\n');
+ }
+ sb.append('\n');
+ }
+ return sb.toString();
+ }
+
+}
diff --git ml/src/main/java/org/apache/hama/ml/perception/SmallMultiLayerPerceptron.java ml/src/main/java/org/apache/hama/ml/perception/SmallMultiLayerPerceptron.java
new file mode 100644
index 0000000..2727bcc
--- /dev/null
+++ ml/src/main/java/org/apache/hama/ml/perception/SmallMultiLayerPerceptron.java
@@ -0,0 +1,418 @@
+package org.apache.hama.ml.perception;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.ml.math.DenseDoubleMatrix;
+import org.apache.hama.ml.math.DenseDoubleVector;
+import org.apache.hama.ml.math.DoubleMatrix;
+import org.apache.hama.ml.math.DoubleVector;
+import org.apache.hama.ml.writable.MatrixWritable;
+import org.apache.hama.ml.writable.VectorWritable;
+import org.mortbay.log.Log;
+
+
+
+/**
+ * SmallMultiLayerPerceptronBSP is a kind of multilayer perceptron
+ * whose parameters can be fit into the memory of a single machine.
+ * This kind of model can be trained and used more efficiently than
+ * the BigMultiLayerPerceptronBSP, whose parameters are distributedly
+ * stored in multiple machines.
+ *
+ * In general, it it is a multilayer perceptron that consists
+ * of one input layer, multiple hidden layer and one output layer.
+ *
+ * The number of neurons in the input layer should be consistent with the
+ * number of features in the training instance.
+ * The number of neurons in the output layer
+ */
+public final class SmallMultiLayerPerceptron extends MultiLayerPerceptron implements Writable {
+
+ /* The in-memory weight matrix */
+ private DenseDoubleMatrix[] weightMatrice;
+
+ /**
+ * {@inheritDoc}
+ */
+ public SmallMultiLayerPerceptron(double learningRate, boolean regularization,
+ double momentum, String squashingFunctionName, String costFunctionName, int[] layerSizeArray) {
+ super(learningRate, regularization, momentum,
+ squashingFunctionName, costFunctionName, layerSizeArray);
+ this.MLPType = "SmallMLP";
+ initializeWeightMatrix();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public SmallMultiLayerPerceptron(String modelPath) {
+ super(modelPath);
+ if (modelPath != null) {
+ try {
+ this.readFromModel();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * Initialize weight matrix using Gaussian distribution.
+ * Each weight is initialized in range (-0.5, 0.5)
+ */
+ private void initializeWeightMatrix() {
+ this.weightMatrice = new DenseDoubleMatrix[this.numberOfLayers - 1];
+ // each layer contains one bias neuron
+ Random rnd = new Random();
+ for (int i = 0; i < this.numberOfLayers - 1; ++i) {
+ // add weights for bias
+ this.weightMatrice[i] = new DenseDoubleMatrix(this.layerSizeArray[i] + 1, this.layerSizeArray[i + 1]);
+ int rowCount = this.weightMatrice[i].getRowCount();
+ int colCount = this.weightMatrice[i].getColumnCount();
+ for (int row = 0; row < rowCount; ++row) {
+ for (int col = 0; col < colCount; ++col) {
+ this.weightMatrice[i].set(row, col, rnd.nextDouble() - 0.5);
+ }
+ }
+ }
+ }
+
+ @Override
+ /**
+ * {@inheritDoc}
+ * The model meta-data is stored in memory.
+ */
+ public DoubleVector output(DoubleVector featureVector) throws Exception {
+ List outputCache = this.outputInternal(featureVector);
+ // the output of the last layer is the output of the MLP
+ return new DenseDoubleVector(outputCache.get(outputCache.size() - 1));
+ }
+
+ private List outputInternal(DoubleVector featureVector) throws Exception {
+
+ // store the output of the hidden layers and output layer, each array store one layer
+ List outputCache = new ArrayList();
+
+ // start from the first hidden layer
+ double[] intermediateResults = new double[this.layerSizeArray[0] + 1];
+ if (intermediateResults.length - 1 != featureVector.getDimension()) {
+ throw new Exception("Input feature dimension incorrect! The dimension of input layer is " +
+ (this.layerSizeArray[0] - 1) + ", but the dimension of input feature is " + featureVector.getDimension());
+ }
+
+ // fill with input features
+ intermediateResults[0] = 1.0; // bias
+ for (int i = 0; i < featureVector.getDimension(); ++i) {
+ intermediateResults[i + 1] = featureVector.get(i);
+ }
+ outputCache.add(intermediateResults);
+
+ // forward the intermediate results to next layer
+ for (int fromLayer = 0; fromLayer < this.numberOfLayers - 1; ++fromLayer) {
+ intermediateResults = forward(fromLayer, intermediateResults);
+ outputCache.add(intermediateResults);
+ }
+
+ return outputCache;
+ }
+
+ /**
+ * Calculate the intermediate results of layer fromLayer + 1.
+ * @param fromLayer The index of layer that forwards the intermediate results from.
+ * @return
+ */
+ private double[] forward(int fromLayer, double[] intermediateResult) {
+ int toLayer = fromLayer + 1;
+ double[] results = null;
+ int offset = 0;
+
+ if (toLayer < this.layerSizeArray.length - 1) { // add bias if it is not output layer
+ results = new double[this.layerSizeArray[toLayer] + 1];
+ offset = 1;
+ results[0] = 1.0; // the bias
+ }
+ else {
+ results = new double[this.layerSizeArray[toLayer]]; // no bias
+ }
+
+ for (int neuronIdx = 0; neuronIdx < this.layerSizeArray[toLayer]; ++neuronIdx) {
+ // aggregate the results from previous layer
+ for (int prevNeuronIdx = 0; prevNeuronIdx < this.layerSizeArray[fromLayer] + 1; ++prevNeuronIdx) {
+ results[neuronIdx + offset] += this.weightMatrice[fromLayer].get(prevNeuronIdx, neuronIdx)
+ * intermediateResult[prevNeuronIdx];
+ }
+ // calculate via squashing function
+ results[neuronIdx + offset] = this.squashingFunction.calculate(0, results[neuronIdx + offset]);
+ }
+
+ return results;
+ }
+
+ /**
+ * Get the updated weights using one training instance.
+ * @param trainingInstance The trainingInstance is the concatenation of feature vector and class label vector.
+ * @return The update of each weight.
+ * @throws Exception
+ */
+ DenseDoubleMatrix[] trainByInstance(DoubleVector trainingInstance) throws Exception {
+ // initialize weight update matrices
+ DenseDoubleMatrix[] weightUpdateMatrices = new DenseDoubleMatrix[this.layerSizeArray.length - 1];
+ for (int m = 0; m < weightUpdateMatrices.length; ++m) {
+ weightUpdateMatrices[m] = new DenseDoubleMatrix(this.layerSizeArray[m] + 1, this.layerSizeArray[m + 1]);
+ }
+
+ if (trainingInstance == null) {
+ return weightUpdateMatrices;
+ }
+
+ double[] trainingVec = trainingInstance.toArray();
+ double[] trainingFeature = Arrays.copyOfRange(trainingVec, 0, this.layerSizeArray[0]);
+ double[] trainingLabels = Arrays.copyOfRange(trainingVec, this.layerSizeArray[0], trainingVec.length);
+
+ DoubleVector trainingFeatureVec = new DenseDoubleVector(trainingFeature);
+ List outputCache = this.outputInternal(trainingFeatureVec);
+
+ // calculate the delta of output layer
+ double[] delta = new double[this.layerSizeArray[this.layerSizeArray.length - 1]];
+ double[] outputLayerOutput = outputCache.get(outputCache.size() - 1);
+ double[] lastHiddenLayerOutput = outputCache.get(outputCache.size() - 2);
+
+ for (int j = 0; j < delta.length; ++j) {
+ delta[j] = this.squashingFunction.getDerivative(outputLayerOutput[j]) *
+ this.costFunction.getPartialDerivative(trainingLabels[j], outputLayerOutput[j]);
+
+ // calculate the weight update matrix between the last hidden layer and the output layer
+ for (int i = 0; i < this.layerSizeArray[this.layerSizeArray.length - 2] + 1; ++i) {
+ double updatedValue = this.learningRate * delta[j] * lastHiddenLayerOutput[i];
+ weightUpdateMatrices[weightUpdateMatrices.length - 1].set(i, j, updatedValue);
+ }
+ }
+
+ // calculate the delta for each hidden layer through back-propagation
+ for (int l = this.layerSizeArray.length - 2; l >= 1; --l) {
+ delta = backpropagate(l, delta, outputCache, weightUpdateMatrices);
+ }
+
+ return weightUpdateMatrices;
+ }
+
+ /**
+ * Back-propagate the errors from nextLayer to prevLayer.
+ * The weight updated information will be stored in the weightUpdateMatrices,
+ * and the delta of the prevLayer would be returned.
+ *
+ * @param curLayerIdx The layer index of the current layer.
+ * @param nextLayerDelta The delta of the next layer.
+ * @param outputCache The cache of the output of all the layers.
+ * @param weightUpdateMatrices The weight update matrices.
+ * @return The delta of the previous layer, will be used for next iteration of back-propagation.
+ */
+ private double[] backpropagate(int curLayerIdx, double[] nextLayerDelta,
+ List outputCache, DenseDoubleMatrix[] weightUpdateMatrices) {
+ int prevLayerIdx = curLayerIdx - 1;
+ double[] delta = new double[this.layerSizeArray[curLayerIdx]];
+ double[] curLayerOutput = outputCache.get(curLayerIdx);
+ double[] prevLayerOutput = outputCache.get(prevLayerIdx);
+
+ // for each neuron j in nextLayer, calculate the delta
+ for (int j = 0; j < delta.length; ++j) {
+ // aggregate delta from next layer
+ for (int k = 0; k < nextLayerDelta.length; ++k) {
+ double weight = this.weightMatrice[curLayerIdx].get(j, k);
+ delta[j] += weight * nextLayerDelta[k];
+ }
+ delta[j] *= this.squashingFunction.getDerivative(curLayerOutput[j + 1]);
+
+ // calculate the weight update matrix between the previous layer and the current layer
+ for (int i = 0; i < weightUpdateMatrices[prevLayerIdx].getRowCount(); ++i) {
+ double updatedValue = this.learningRate * delta[j] * prevLayerOutput[i];
+ weightUpdateMatrices[prevLayerIdx].set(i, j, updatedValue);
+ }
+ }
+
+ return delta;
+ }
+
+ @Override
+ /**
+ * {@inheritDoc}
+ */
+ public void train(Path dataInputPath, Map trainingParams)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ // create the BSP training job
+ Configuration conf = new Configuration();
+ for (Map.Entry entry : trainingParams.entrySet()) {
+ conf.set(entry.getKey(), entry.getValue());
+ }
+
+ // put model related parameters
+ if (modelPath == null || modelPath.trim().length() == 0) { // build model from scratch
+ conf.set("MLPType", this.MLPType);
+ conf.set("learningRate", "" + this.learningRate);
+ conf.set("regularization", "" + this.regularization);
+ conf.set("momentum", "" + this.momentum);
+ conf.set("squashingFunctionName", this.squashingFunctionName);
+ conf.set("costFunctionName", this.costFunctionName);
+ StringBuilder layerSizeArraySb = new StringBuilder();
+ for (int layerSize : this.layerSizeArray) {
+ layerSizeArraySb.append(layerSize);
+ layerSizeArraySb.append(' ');
+ }
+ conf.set("layerSizeArray", layerSizeArraySb.toString());
+ }
+
+ HamaConfiguration hamaConf = new HamaConfiguration(conf);
+
+ BSPJob job = new BSPJob(hamaConf, SmallMLPTrainer.class);
+ job.setJobName("Small scale MLP training");
+ job.setJarByClass(SmallMLPTrainer.class);
+ job.setBspClass(SmallMLPTrainer.class);
+ job.setInputPath(dataInputPath);
+ job.setInputFormat(org.apache.hama.bsp.SequenceFileInputFormat.class);
+ job.setInputKeyClass(LongWritable.class);
+ job.setInputValueClass(VectorWritable.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(NullWritable.class);
+ job.setOutputFormat(org.apache.hama.bsp.NullOutputFormat.class);
+
+ int numTasks = conf.getInt("tasks", 1);
+ job.setNumBspTask(numTasks);
+ job.waitForCompletion(true);
+
+ // reload learned model
+ Log.info(String.format("Reload model from %s.", trainingParams.get("modelPath")));
+ this.modelPath = trainingParams.get("modelPath");
+ this.readFromModel();
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ this.MLPType = WritableUtils.readString(input);
+ this.learningRate = input.readDouble();
+ this.regularization = input.readBoolean();
+ this.momentum = input.readDouble();
+ this.numberOfLayers = input.readInt();
+ this.squashingFunctionName = WritableUtils.readString(input);
+ this.costFunctionName = WritableUtils.readString(input);
+ // read the number of neurons for each layer
+ this.layerSizeArray = new int[this.numberOfLayers];
+ for (int i = 0; i < numberOfLayers; ++i) {
+ this.layerSizeArray[i] = input.readInt();
+ }
+ this.weightMatrice = new DenseDoubleMatrix[this.numberOfLayers - 1];
+ for (int i = 0; i < numberOfLayers - 1; ++i) {
+ this.weightMatrice[i] = (DenseDoubleMatrix)MatrixWritable.read(input);
+ }
+ this.squashingFunction = SquashingFunctionFactory.getSquashingFunction(this.squashingFunctionName);
+ this.costFunction = CostFunctionFactory.getCostFunction(this.costFunctionName);
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ WritableUtils.writeString(output, MLPType);
+ output.writeDouble(learningRate);
+ output.writeBoolean(regularization);
+ output.writeDouble(momentum);
+ output.writeInt(numberOfLayers);
+ WritableUtils.writeString(output, squashingFunctionName);
+ WritableUtils.writeString(output, costFunctionName);
+
+ // write the number of neurons for each layer
+ for (int i = 0; i
+ * cost(t, y) = 0.5 * (t - y)^2
+ *
+ */
+public class SquaredError extends CostFunction {
+
+ @Override
+ /**
+ * {@inheritDoc}
+ */
+ public double getCost(double target, double actual) {
+ double diff = target - actual;
+ return 0.5 * diff * diff;
+ }
+
+ @Override
+ /**
+ * {@inheritDoc}
+ */
+ public double getPartialDerivative(double target, double actual) {
+ return target - actual;
+ }
+
+}
diff --git ml/src/main/java/org/apache/hama/ml/perception/SquashingFunction.java ml/src/main/java/org/apache/hama/ml/perception/SquashingFunction.java
new file mode 100644
index 0000000..a7e1799
--- /dev/null
+++ ml/src/main/java/org/apache/hama/ml/perception/SquashingFunction.java
@@ -0,0 +1,22 @@
+package org.apache.hama.ml.perception;
+
+import org.apache.hama.ml.math.DoubleVectorFunction;
+
+/**
+ * The squashing function to activate the neurons.
+ *
+ */
+public abstract class SquashingFunction implements DoubleVectorFunction {
+
+ /**
+ * Calculates the result with a given index and value of a vector.
+ */
+ public abstract double calculate(int index, double value);
+
+ /**
+ * Apply the gradient descent to each of the elements in vector.
+ * @param vector
+ * @return
+ */
+ public abstract double getDerivative(double value);
+}
diff --git ml/src/main/java/org/apache/hama/ml/perception/SquashingFunctionFactory.java ml/src/main/java/org/apache/hama/ml/perception/SquashingFunctionFactory.java
new file mode 100644
index 0000000..a8a44c9
--- /dev/null
+++ ml/src/main/java/org/apache/hama/ml/perception/SquashingFunctionFactory.java
@@ -0,0 +1,25 @@
+package org.apache.hama.ml.perception;
+
+/**
+ * Get the squashing function according to the name.
+ */
+public class SquashingFunctionFactory {
+
+ /**
+ * Get the squashing function instance according to the name.
+ * If no matched squahsing function is found, return the sigmoid
+ * squashing function by default.
+ * @param name The name of the squashing function.
+ * @return The instance of the squashing function.
+ */
+ public static SquashingFunction getSquashingFunction(String name) {
+ if (name.equalsIgnoreCase("Sigmoid")) {
+ return new Sigmoid();
+ }
+ else if (name.equalsIgnoreCase("Tanh")) {
+ return new Tanh();
+ }
+ return new Sigmoid();
+ }
+
+}
diff --git ml/src/main/java/org/apache/hama/ml/perception/Tanh.java ml/src/main/java/org/apache/hama/ml/perception/Tanh.java
new file mode 100644
index 0000000..bf4fbb5
--- /dev/null
+++ ml/src/main/java/org/apache/hama/ml/perception/Tanh.java
@@ -0,0 +1,19 @@
+package org.apache.hama.ml.perception;
+
+/**
+ * The hyperbolic tangent function.
+ * It is used as a squashing function in multi-layer perceptron.
+ */
+public class Tanh extends SquashingFunction {
+
+ @Override
+ public double calculate(int index, double value) {
+ return Math.tanh(value);
+ }
+
+ @Override
+ public double getDerivative(double value) {
+ return 1 - value * value;
+ }
+
+}
diff --git ml/src/test/java/org/apache/hama/ml/perception/TestSmallMLPMessage.java ml/src/test/java/org/apache/hama/ml/perception/TestSmallMLPMessage.java
new file mode 100644
index 0000000..e631646
--- /dev/null
+++ ml/src/test/java/org/apache/hama/ml/perception/TestSmallMLPMessage.java
@@ -0,0 +1,78 @@
+package org.apache.hama.ml.perception;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.ml.math.DenseDoubleMatrix;
+import org.junit.Test;
+
+/**
+ * Test the functionalities of SmallMLPMessage
+ *
+ */
+public class TestSmallMLPMessage {
+
+ @Test
+ public void testReadWrite() {
+ int owner = 101;
+ double[][] mat = {
+ {1, 2, 3},
+ {4, 5, 6},
+ {7, 8, 9}
+ };
+
+ double[][] mat2 = {
+ {10, 20},
+ {30, 40},
+ {50, 60}
+ };
+
+ double[][][] mats = {mat, mat2};
+
+ DenseDoubleMatrix[] matrices = new DenseDoubleMatrix[] {
+ new DenseDoubleMatrix(mat), new DenseDoubleMatrix(mat2)
+ };
+
+ SmallMLPMessage message = new SmallMLPMessage(owner, true, matrices);
+
+ Configuration conf = new Configuration();
+ String strPath = "testSmallMLPMessage";
+ Path path = new Path(strPath);
+ try {
+ FileSystem fs = FileSystem.get(new URI(strPath), conf);
+ FSDataOutputStream out = fs.create(path, true);
+ message.write(out);
+ out.close();
+
+ FSDataInputStream in = fs.open(path);
+ SmallMLPMessage outMessage = new SmallMLPMessage(0, false, null);
+ outMessage.readFields(in);
+
+ assertEquals(owner, outMessage.getOwner());
+ DenseDoubleMatrix[] outMatrices = outMessage.getWeightsUpdatedMatrices();
+ // check each matrix
+ for (int i = 0; i < outMatrices.length; ++i) {
+ double[][] outMat = outMessage.getWeightsUpdatedMatrices()[i].getValues();
+ for (int j = 0; j < outMat.length; ++j) {
+ assertArrayEquals(mats[i][j], outMat[j], 0.0001);
+ }
+ }
+
+ fs.delete(path, true);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (URISyntaxException e) {
+ e.printStackTrace();
+ }
+
+ }
+}
diff --git ml/src/test/java/org/apache/hama/ml/perception/TestSmallMultiLayerPerceptron.java ml/src/test/java/org/apache/hama/ml/perception/TestSmallMultiLayerPerceptron.java
new file mode 100644
index 0000000..83cd39a
--- /dev/null
+++ ml/src/test/java/org/apache/hama/ml/perception/TestSmallMultiLayerPerceptron.java
@@ -0,0 +1,275 @@
+package org.apache.hama.ml.perception;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.ml.math.DenseDoubleMatrix;
+import org.apache.hama.ml.math.DenseDoubleVector;
+import org.apache.hama.ml.math.DoubleMatrix;
+import org.apache.hama.ml.math.DoubleVector;
+import org.apache.hama.ml.writable.MatrixWritable;
+import org.apache.hama.ml.writable.VectorWritable;
+import org.junit.Ignore;
+import org.junit.Test;
+
+
+public class TestSmallMultiLayerPerceptron {
+
+ /**
+ * Write and read the parameters of MLP.
+ */
+ @Test
+ public void testWriteReadMLP() {
+ String modelPath = "sampleModel.data";
+ double learningRate = 0.5;
+ boolean regularization = false; // no regularization
+ double momentum = 0; // no momentum
+ String squashingFunctionName = "Sigmoid";
+ String costFunctionName = "SquaredError";
+ int[] layerSizeArray = new int[]{3, 2, 2, 3};
+ MultiLayerPerceptron mlp = new SmallMultiLayerPerceptron(learningRate, regularization,
+ momentum, squashingFunctionName, costFunctionName, layerSizeArray);
+ try {
+ mlp.writeModelToFile(modelPath);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ try {
+ // read the meta-data
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(conf);
+ mlp = new SmallMultiLayerPerceptron(modelPath);
+ assertEquals("SmallMLP", mlp.getMLPType());
+ assertEquals(learningRate, mlp.getLearningRate(), 0.001);
+ assertEquals(regularization, mlp.isRegularization());
+ assertEquals(layerSizeArray.length, mlp.getNumberOfLayers());
+ assertEquals(momentum, mlp.getMomentum(), 0.001);
+ assertEquals(squashingFunctionName, mlp.getSquashingFunctionName());
+ assertEquals(costFunctionName, mlp.getCostFunctionName());
+ assertArrayEquals(layerSizeArray, mlp.getLayerSizeArray());
+ // delete test file
+ fs.delete(new Path(modelPath), true);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Test the output of an example MLP.
+ */
+ @Test
+ public void testOutput() {
+ // write the MLP meta-data manually
+ String modelPath = "sampleModel.data";
+ Configuration conf = new Configuration();
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ FSDataOutputStream output = fs.create(new Path(modelPath));
+
+ String MLPType = "SmallMLP";
+ double learningRate = 0.5;
+ boolean regularization = false;
+ double momentum = 0;
+ String squashingFunctionName = "Sigmoid";
+ String costFunctionName = "SquaredError";
+ int[] layerSizeArray = new int[] {3, 2, 3, 3};
+ int numberOfLayers = layerSizeArray.length;
+
+ WritableUtils.writeString(output, MLPType);
+ output.writeDouble(learningRate);
+ output.writeBoolean(regularization);
+ output.writeDouble(momentum);
+ output.writeInt(numberOfLayers);
+ WritableUtils.writeString(output, squashingFunctionName);
+ WritableUtils.writeString(output, costFunctionName);
+
+ // write the number of neurons for each layer
+ for (int i = 0; i < numberOfLayers; ++i) {
+ output.writeInt(layerSizeArray[i]);
+ }
+
+ double[][] matrix01 = { // 4 by 2
+ {0.5, 0.2},
+ {0.1, 0.1},
+ {0.2, 0.5},
+ {0.1, 0.5}
+ };
+
+ double[][] matrix12 = { // 3 by 3
+ {0.1, 0.2, 0.5},
+ {0.2, 0.5, 0.2},
+ {0.5, 0.5, 0.1}
+ };
+
+ double[][] matrix23 = { // 4 by 3
+ {0.2, 0.5, 0.2},
+ {0.5, 0.1, 0.5},
+ {0.1, 0.2, 0.1},
+ {0.1, 0.2, 0.5}
+ };
+
+ DoubleMatrix[] matrices = {new DenseDoubleMatrix(matrix01), new DenseDoubleMatrix(matrix12), new DenseDoubleMatrix(matrix23)};
+ for (DoubleMatrix mat : matrices) {
+ MatrixWritable.write(mat, output);
+ }
+ output.close();
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ // initial the mlp with existing model meta-data and get the output
+ MultiLayerPerceptron mlp = new SmallMultiLayerPerceptron(modelPath);
+ int[] sizes = mlp.getLayerSizeArray();
+ DoubleVector input = new DenseDoubleVector(new double[]{1, 2, 3});
+ try {
+ DoubleVector result = mlp.output(input);
+ assertArrayEquals(new double[]{0.6636557, 0.7009963, 0.7213835}, result.toArray(), 0.0001);
+ } catch (Exception e1) {
+ e1.printStackTrace();
+ }
+
+ // delete meta-data
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ fs.delete(new Path(modelPath), true);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+
+ /**
+ * Test the MLP on XOR problem.
+ */
+ @Test
+ public void testSingleInstanceTraining() {
+ // generate training data
+ DoubleVector[] trainingData = new DenseDoubleVector[] {
+ new DenseDoubleVector(new double[] {0, 0, 0}),
+ new DenseDoubleVector(new double[] {0, 1, 1}),
+ new DenseDoubleVector(new double[] {1, 0, 1}),
+ new DenseDoubleVector(new double[] {1, 1, 0})
+ };
+
+ // set parameters
+ double learningRate = 0.6;
+ boolean regularization = false; // no regularization
+ double momentum = 0; // no momentum
+ String squashingFunctionName = "Sigmoid";
+ String costFunctionName = "SquaredError";
+ int[] layerSizeArray = new int[]{2, 5, 1};
+ SmallMultiLayerPerceptron mlp = new SmallMultiLayerPerceptron(learningRate, regularization,
+ momentum, squashingFunctionName, costFunctionName, layerSizeArray);
+
+ try {
+ // train by multiple instances
+ Random rnd = new Random();
+ for (int i = 0; i < 30000; ++i) {
+ DenseDoubleMatrix[] weightUpdates = mlp.trainByInstance(trainingData[rnd.nextInt(4)]);
+ mlp.updateWeightMatrices(weightUpdates);
+ }
+
+// System.out.printf("Weight matrices: %s\n", mlp.weightsToString(mlp.getWeightMatrices()));
+ for (int i = 0; i < trainingData.length; ++i) {
+ DenseDoubleVector testVec = (DenseDoubleVector)trainingData[i].slice(2);
+ assertEquals(trainingData[i].toArray()[2], mlp.output(testVec).toArray()[0], 0.15);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Test the XOR problem.
+ */
+ @Test
+ public void testTrainingByXOR() {
+ // write in some training instances
+ Configuration conf = new Configuration();
+ String strDataPath = "hdfs://localhost:9000/tmp/xor";
+ Path dataPath = new Path(strDataPath);
+
+ // generate training data
+ DoubleVector[] trainingData = new DenseDoubleVector[] {
+ new DenseDoubleVector(new double[] {0, 0, 0}),
+ new DenseDoubleVector(new double[] {0, 1, 1}),
+ new DenseDoubleVector(new double[] {1, 0, 1}),
+ new DenseDoubleVector(new double[] {1, 1, 0})
+ };
+
+ try {
+ URI uri = new URI(strDataPath);
+ FileSystem hdfs = FileSystem.get(uri, conf);
+ hdfs.delete(dataPath, true);
+ if (!hdfs.exists(dataPath)) {
+ hdfs.createNewFile(dataPath);
+ SequenceFile.Writer writer = new SequenceFile.Writer(hdfs, conf, dataPath,
+ LongWritable.class, VectorWritable.class);
+
+ Random rnd = new Random();
+ for (int i = 0; i < 1000; ++i) {
+ VectorWritable vecWritable = new VectorWritable(trainingData[i % 4]);
+ writer.append(new LongWritable(i), vecWritable);
+ }
+ writer.close();
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ // begin training
+ String modelPath = "xorModel.data";
+ double learningRate = 0.6;
+ boolean regularization = false; // no regularization
+ double momentum = 0; // no momentum
+ String squashingFunctionName = "Tanh";
+ String costFunctionName = "SquaredError";
+ int[] layerSizeArray = new int[]{2, 5, 1};
+ SmallMultiLayerPerceptron mlp = new SmallMultiLayerPerceptron(learningRate, regularization,
+ momentum, squashingFunctionName, costFunctionName, layerSizeArray);
+
+ Map trainingParams = new HashMap();
+ trainingParams.put("training.iteration", "10000");
+ trainingParams.put("training.mode", "minibatch.gradient.descent");
+ trainingParams.put("training.batch.size", "100");
+ trainingParams.put("tasks", "3");
+ trainingParams.put("modelPath", modelPath);
+
+ try {
+ mlp.train(dataPath, trainingParams);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ // test the model
+ for (int i = 0; i < trainingData.length; ++i) {
+ DenseDoubleVector testVec = (DenseDoubleVector)trainingData[i].slice(2);
+ DenseDoubleVector expectedVec = (DenseDoubleVector)trainingData[i].slice(2, 3);
+ try {
+ DenseDoubleVector actual = (DenseDoubleVector)mlp.output(testVec);
+ assertEquals(trainingData[i].toArray()[2], actual.get(0), 0.15);
+ System.out.printf("Input: %s,\tExpected: %s,\tTest: %s\n", testVec, expectedVec, actual);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+}