Index: ml/src/main/java/org/apache/hama/ml/perception/CostFunction.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/perception/CostFunction.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/perception/CostFunction.java (working copy) @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.perception; + +/** + * The common interface for cost functions. + * + */ +public abstract class CostFunction { + + /** + * Get the error evaluated by squared error. + * + * @param target The target value. + * @param actual The actual value. + * @return + */ + public abstract double calculate(double target, double actual); + + /** + * Get the partial derivative of squared error. + * + * @param target + * @param actual + * @return + */ + public abstract double calculateDerivative(double target, double actual); + +} Index: ml/src/main/java/org/apache/hama/ml/perception/CostFunctionFactory.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/perception/CostFunctionFactory.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/perception/CostFunctionFactory.java (working copy) @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.perception; + +/** + * The cost function factory that generates the cost function by name. + */ +public class CostFunctionFactory { + + /** + * Get the cost function according to the name. If no matched cost function is + * found, return the SquaredError by default. + * + * @param name The name of the cost function. + * @return The cost function instance. + */ + public static CostFunction getCostFunction(String name) { + if (name.equalsIgnoreCase("SquaredError")) { + return new SquaredError(); + } else if (name.equalsIgnoreCase("LogisticError")) { + return new LogisticCostFunction(); + } + return new SquaredError(); + } +} Index: ml/src/main/java/org/apache/hama/ml/perception/LogisticCostFunction.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/perception/LogisticCostFunction.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/perception/LogisticCostFunction.java (working copy) @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.perception; + +/** + * The logistic cost function. + * + *
+ * cost(t, y) = - t * log(y) - (1 - t) * log(1 - y),
+ * where t denotes the target value, y denotes the estimated value.
+ * 
+ */ +public class LogisticCostFunction extends CostFunction { + + @Override + public double calculate(double target, double actual) { + return -target * Math.log(actual) - (1 - target) * Math.log(1 - actual); + } + + @Override + public double calculateDerivative(double target, double actual) { + double adjustedTarget = target; + double adjustedActual = actual; + if (adjustedActual == 1) { + adjustedActual = 0.999; + } else if (actual == 0) { + adjustedActual = 0.001; + } + if (adjustedTarget == 1) { + adjustedTarget = 0.999; + } else if (adjustedTarget == 0) { + adjustedTarget = 0.001; + } + return -adjustedTarget / adjustedActual + (1 - adjustedTarget) + / (1 - adjustedActual); + } + +} Index: ml/src/main/java/org/apache/hama/ml/perception/MLPMessage.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/perception/MLPMessage.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/perception/MLPMessage.java (working copy) @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.perception; + +import org.apache.hadoop.io.Writable; + +/** + * MLPMessage is used to hold the parameters that needs to be sent between the + * tasks. + */ +public abstract class MLPMessage implements Writable { + protected boolean terminated; + + public MLPMessage(boolean terminated) { + setTerminated(terminated); + } + + public void setTerminated(boolean terminated) { + this.terminated = terminated; + } + + public boolean isTerminated() { + return terminated; + } + +} Index: ml/src/main/java/org/apache/hama/ml/perception/MultiLayerPerceptron.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/perception/MultiLayerPerceptron.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/perception/MultiLayerPerceptron.java (working copy) @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.perception; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.fs.Path; +import org.apache.hama.ml.math.DoubleVector; + +/** + * PerceptronBase defines the common behavior of all the concrete perceptrons. + */ +public abstract class MultiLayerPerceptron { + + /* The trainer for the model */ + protected PerceptronTrainer trainer; + /* The file path that contains the model meta-data */ + protected String modelPath; + + /* Model meta-data */ + protected String MLPType; + protected double learningRate; + protected boolean regularization; + protected double momentum; + protected int numberOfLayers; + protected String squashingFunctionName; + protected String costFunctionName; + protected int[] layerSizeArray; + + protected CostFunction costFunction; + protected SquashingFunction squashingFunction; + + /** + * Initialize the MLP. + * + * @param learningRate Larger learningRate makes MLP learn more aggressive. + * @param regularization Turn on regularization make MLP less likely to + * overfit. + * @param momentum The momentum makes the historical adjust have affect to + * current adjust. + * @param squashingFunctionName The name of squashing function. + * @param costFunctionName The name of the cost function. + * @param layerSizeArray The number of neurons for each layer. Note that the + * actual size of each layer is one more than the input size. + */ + public MultiLayerPerceptron(double learningRate, boolean regularization, + double momentum, String squashingFunctionName, String costFunctionName, + int[] layerSizeArray) { + this.learningRate = learningRate; + this.regularization = regularization; // no regularization + this.momentum = momentum; // no momentum + this.squashingFunctionName = squashingFunctionName; + this.costFunctionName = costFunctionName; + this.layerSizeArray = layerSizeArray; + this.numberOfLayers = this.layerSizeArray.length; + + this.costFunction = CostFunctionFactory + .getCostFunction(this.costFunctionName); + this.squashingFunction = SquashingFunctionFactory + .getSquashingFunction(this.squashingFunctionName); + } + + /** + * Initialize a multi-layer perceptron with existing model. + * + * @param modelPath Location of existing model meta-data. + */ + public MultiLayerPerceptron(String modelPath) { + this.modelPath = modelPath; + } + + /** + * Train the model with given data. This method invokes a perceptron training + * BSP task to train the model. It then write the model to modelPath. + * + * @param dataInputPath The path of the data. + * @param trainingParams Extra parameters for training. + */ + public abstract void train(Path dataInputPath, + Map trainingParams) throws Exception; + + /** + * Get the output based on the input instance and the learned model. + * + * @param featureVector The feature of an instance to feed the perceptron. + * @return The results. + */ + public abstract DoubleVector output(DoubleVector featureVector) + throws Exception; + + /** + * Read the model meta-data from the specified location. + * + * @throws IOException + */ + protected abstract void readFromModel() throws IOException; + + /** + * Write the model data to specified location. + * + * @param modelPath The location in file system to store the model. + * @throws IOException + */ + public abstract void writeModelToFile(String modelPath) throws IOException; + + public String getModelPath() { + return modelPath; + } + + public String getMLPType() { + return MLPType; + } + + public double getLearningRate() { + return learningRate; + } + + public boolean isRegularization() { + return regularization; + } + + public double getMomentum() { + return momentum; + } + + public int getNumberOfLayers() { + return numberOfLayers; + } + + public String getSquashingFunctionName() { + return squashingFunctionName; + } + + public String getCostFunctionName() { + return costFunctionName; + } + + public int[] getLayerSizeArray() { + return layerSizeArray; + } + +} Index: ml/src/main/java/org/apache/hama/ml/perception/PerceptronTrainer.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/perception/PerceptronTrainer.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/perception/PerceptronTrainer.java (working copy) @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.perception; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hama.bsp.BSP; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.sync.SyncException; +import org.apache.hama.ml.writable.VectorWritable; + +/** + * The trainer that is used to train the perceptron with BSP. The trainer would + * read the training data and obtain the trained parameters of the model. + * + */ +public abstract class PerceptronTrainer extends + BSP { + + protected Configuration conf; + protected int maxIteration; + protected int batchSize; + protected String trainingMode; + + @Override + public void setup( + BSPPeer peer) + throws IOException, SyncException, InterruptedException { + conf = peer.getConfiguration(); + trainingMode = conf.get("training.mode"); + batchSize = conf.getInt("training.batch.size", 100); // mini-batch by + // default + this.extraSetup(peer); + } + + /** + * Handle extra setup for sub-classes. + * + * @param peer + * @throws IOException + * @throws SyncException + * @throws InterruptedException + */ + protected void extraSetup( + BSPPeer peer) + throws IOException, SyncException, InterruptedException { + } + + /** + * {@inheritDoc} + */ + @Override + public abstract void bsp( + BSPPeer peer) + throws IOException, SyncException, InterruptedException; + + @Override + public void cleanup( + BSPPeer peer) + throws IOException { + + this.extraCleanup(peer); + } + + /** + * Handle extra cleanup for sub-classes. + * + * @param peer + * @throws IOException + * @throws SyncException + * @throws InterruptedException + */ + protected void extraCleanup( + BSPPeer peer) + throws IOException { + } + +} Index: ml/src/main/java/org/apache/hama/ml/perception/Sigmoid.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/perception/Sigmoid.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/perception/Sigmoid.java (working copy) @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.perception; + + +/** + * The Sigmoid function + * + *
+ * f(z) = 1 / (1 + e^{-z})
+ * 
+ */ +public class Sigmoid extends SquashingFunction { + + @Override + public double calculate(int index, double value) { + return 1.0 / (1 + Math.exp(-value)); + } + + @Override + public double calculateDerivative(double value) { + return value * (1 - value); + } +} Index: ml/src/main/java/org/apache/hama/ml/perception/SmallMLPMessage.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/perception/SmallMLPMessage.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/perception/SmallMLPMessage.java (working copy) @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.perception; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hama.ml.math.DenseDoubleMatrix; +import org.apache.hama.ml.writable.MatrixWritable; + +/** + * SmallMLPMessage is used to exchange information for the + * {@link SmallMultiLayerPerceptron}. It send the whole parameter matrix from + * one task to another. + */ +public class SmallMLPMessage extends MLPMessage { + + private int owner; // the ID of the task who creates the message + private DenseDoubleMatrix[] weightUpdatedMatrices; + private int numOfMatrices; + + public SmallMLPMessage(int owner, boolean terminated, DenseDoubleMatrix[] mat) { + super(terminated); + this.owner = owner; + this.weightUpdatedMatrices = mat; + this.numOfMatrices = this.weightUpdatedMatrices == null ? 0 + : this.weightUpdatedMatrices.length; + } + + /** + * Get the owner task Id of the message. + * + * @return + */ + public int getOwner() { + return owner; + } + + /** + * Get the updated weight matrices. + * + * @return + */ + public DenseDoubleMatrix[] getWeightsUpdatedMatrices() { + return this.weightUpdatedMatrices; + } + + @Override + public void readFields(DataInput input) throws IOException { + this.owner = input.readInt(); + this.terminated = input.readBoolean(); + this.numOfMatrices = input.readInt(); + this.weightUpdatedMatrices = new DenseDoubleMatrix[this.numOfMatrices]; + for (int i = 0; i < this.numOfMatrices; ++i) { + this.weightUpdatedMatrices[i] = (DenseDoubleMatrix) MatrixWritable + .read(input); + } + } + + @Override + public void write(DataOutput output) throws IOException { + output.writeInt(this.owner); + output.writeBoolean(this.terminated); + output.writeInt(this.numOfMatrices); + for (int i = 0; i < this.numOfMatrices; ++i) { + MatrixWritable.write(this.weightUpdatedMatrices[i], output); + } + } + +} Index: ml/src/main/java/org/apache/hama/ml/perception/SmallMLPTrainer.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/perception/SmallMLPTrainer.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/perception/SmallMLPTrainer.java (working copy) @@ -0,0 +1,320 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.perception; + +import java.io.IOException; +import java.util.Arrays; +import java.util.BitSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.sync.SyncException; +import org.apache.hama.ml.math.DenseDoubleMatrix; +import org.apache.hama.ml.writable.VectorWritable; + +/** + * The perceptron trainer for small scale MLP. + */ +public class SmallMLPTrainer extends PerceptronTrainer { + + private static final Log LOG = LogFactory.getLog(SmallMLPTrainer.class); + /* used by master only, check whether all slaves finishes reading */ + private BitSet statusSet; + + private int numTrainingInstanceRead = 0; + /* Once reader reaches the EOF, the training procedure would be terminated */ + private boolean terminateTraining = false; + + private SmallMultiLayerPerceptron inMemoryPerceptron; + + private int[] layerSizeArray; + + @Override + protected void extraSetup( + BSPPeer peer) { + + this.statusSet = new BitSet(peer.getConfiguration().getInt("tasks", 1)); + + String outputModelPath = conf.get("modelPath"); + if (outputModelPath == null || outputModelPath.trim().length() == 0) { + try { + throw new Exception("Please specify output model path."); + } catch (Exception e) { + e.printStackTrace(); + } + } + + String modelPath = conf.get("existingModelPath"); + // build model from scratch + if (modelPath == null || modelPath.trim().length() == 0) { + double learningRate = Double.parseDouble(conf.get("learningRate")); + boolean regularization = Boolean.parseBoolean(conf.get("regularization")); + double momentum = Double.parseDouble(conf.get("momentum")); + String squashingFunctionName = conf.get("squashingFunctionName"); + String costFunctionName = conf.get("costFunctionName"); + String[] layerSizeArrayStr = conf.get("layerSizeArray").trim().split(" "); + this.layerSizeArray = new int[layerSizeArrayStr.length]; + for (int i = 0; i < this.layerSizeArray.length; ++i) { + this.layerSizeArray[i] = Integer.parseInt(layerSizeArrayStr[i]); + } + + this.inMemoryPerceptron = new SmallMultiLayerPerceptron(learningRate, + regularization, momentum, squashingFunctionName, costFunctionName, + layerSizeArray); + LOG.info("Training model from scratch."); + } else { // read model from existing data + this.inMemoryPerceptron = new SmallMultiLayerPerceptron(modelPath); + LOG.info("Training with existing model."); + } + + } + + @Override + protected void extraCleanup( + BSPPeer peer) { + LOG.info(String.format("Task %d totally read %d records.\n", + peer.getPeerIndex(), this.numTrainingInstanceRead)); + // master write learned model to disk + if (peer.getPeerIndex() == 0) { + try { + LOG.info(String.format("Master write learned model to %s\n", + conf.get("modelPath"))); + this.inMemoryPerceptron.writeModelToFile(conf.get("modelPath")); + } catch (IOException e) { + System.err.println("Please set a correct model path."); + } + } + } + + @Override + public void bsp( + BSPPeer peer) + throws IOException, SyncException, InterruptedException { + LOG.info("Start training..."); + if (trainingMode.equalsIgnoreCase("minibatch.gradient.descent")) { + LOG.info("Training Mode: minibatch.gradient.descent"); + trainByMinibatch(peer); + } + + LOG.info(String.format("Task %d finished.", peer.getPeerIndex())); + } + + /** + * Train the MLP with stochastic gradient descent. + * + * @param peer + * @throws IOException + * @throws SyncException + * @throws InterruptedException + */ + private void trainByMinibatch( + BSPPeer peer) + throws IOException, SyncException, InterruptedException { + + int maxIteration = conf.getInt("training.iteration", 1); + LOG.info("# of Training Iteration: " + maxIteration); + + for (int i = 0; i < maxIteration; ++i) { + if (peer.getPeerIndex() == 0) { + LOG.info(String.format("Iteration [%d] begins...", i)); + } + peer.reopenInput(); + // reset status + if (peer.getPeerIndex() == 0) { + this.statusSet = new BitSet(peer.getConfiguration().getInt("tasks", 1)); + } + this.terminateTraining = false; + peer.sync(); + while (true) { + // each slate task updates weights according to training data + boolean terminate = updateWeights(peer); + peer.sync(); + + // master merges the updates + if (peer.getPeerIndex() == 0) { + mergeUpdate(peer); + } + peer.sync(); + + if (terminate) { + break; + } + } + + } + + } + + /** + * Merge the updates from slaves task. + * + * @param peer + * @throws IOException + */ + private void mergeUpdate( + BSPPeer peer) + throws IOException { + // initialize the cache + DenseDoubleMatrix[] mergedUpdates = this.getZeroWeightMatrices(); + + int numOfPartitions = peer.getNumCurrentMessages(); + + // aggregates the weights update + while (peer.getNumCurrentMessages() > 0) { + SmallMLPMessage message = (SmallMLPMessage) peer.getCurrentMessage(); + if (message.isTerminated()) { + this.statusSet.set(message.getOwner()); + } + + DenseDoubleMatrix[] weightUpdates = message.getWeightsUpdatedMatrices(); + for (int m = 0; m < mergedUpdates.length; ++m) { + mergedUpdates[m] = (DenseDoubleMatrix) mergedUpdates[m] + .add(weightUpdates[m]); + } + } + + if (numOfPartitions != 0) { + // calculate the global mean (the mean of batches from all slave tasks) of + // the weight updates + for (int m = 0; m < mergedUpdates.length; ++m) { + mergedUpdates[m] = (DenseDoubleMatrix) mergedUpdates[m] + .divide(numOfPartitions); + } + + // check if all tasks finishes reading data + if (this.statusSet.cardinality() == conf.getInt("tasks", 1)) { + this.terminateTraining = true; + } + + // update the weight matrices + this.inMemoryPerceptron.updateWeightMatrices(mergedUpdates); + } + + // broadcast updated weight matrices + for (String peerName : peer.getAllPeerNames()) { + SmallMLPMessage msg = new SmallMLPMessage(peer.getPeerIndex(), + this.terminateTraining, this.inMemoryPerceptron.getWeightMatrices()); + peer.send(peerName, msg); + } + + } + + /** + * Train the MLP with training data. + * + * @param peer + * @return Whether terminates. + * @throws IOException + */ + private boolean updateWeights( + BSPPeer peer) + throws IOException { + // receive update message sent by master + if (peer.getNumCurrentMessages() > 0) { + SmallMLPMessage message = (SmallMLPMessage) peer.getCurrentMessage(); + this.terminateTraining = message.isTerminated(); + // each slave renew its weight matrices + this.inMemoryPerceptron.setWeightMatrices(message + .getWeightsUpdatedMatrices()); + if (this.terminateTraining) { + return true; + } + } + + // update weight according to training data + DenseDoubleMatrix[] weightUpdates = this.getZeroWeightMatrices(); + + int count = 0; + LongWritable recordId = new LongWritable(); + VectorWritable trainingInstance = new VectorWritable(); + boolean hasMore = false; + while (count++ < this.batchSize) { + hasMore = peer.readNext(recordId, trainingInstance); + + try { + DenseDoubleMatrix[] singleTrainingInstanceUpdates = this.inMemoryPerceptron + .trainByInstance(trainingInstance.getVector()); + // aggregate the updates + for (int m = 0; m < weightUpdates.length; ++m) { + weightUpdates[m] = (DenseDoubleMatrix) weightUpdates[m] + .add(singleTrainingInstanceUpdates[m]); + } + } catch (Exception e) { + e.printStackTrace(); + } + + ++numTrainingInstanceRead; + if (!hasMore) { + break; + } + } + + // calculate the local mean (the mean of the local batch) of weight updates + for (int m = 0; m < weightUpdates.length; ++m) { + weightUpdates[m] = (DenseDoubleMatrix) weightUpdates[m].divide(count); + } + + LOG.info(String.format("Task %d has read %d records.", + peer.getPeerIndex(), this.numTrainingInstanceRead)); + + // send the weight updates to master task + SmallMLPMessage message = new SmallMLPMessage(peer.getPeerIndex(), + !hasMore, weightUpdates); + peer.send(peer.getPeerName(0), message); // send status to master + + return !hasMore; + } + + /** + * Initialize the weight matrices. + */ + private DenseDoubleMatrix[] getZeroWeightMatrices() { + DenseDoubleMatrix[] weightUpdateCache = new DenseDoubleMatrix[this.layerSizeArray.length - 1]; + // initialize weight matrix each layer + for (int i = 0; i < weightUpdateCache.length; ++i) { + weightUpdateCache[i] = new DenseDoubleMatrix(this.layerSizeArray[i] + 1, + this.layerSizeArray[i + 1]); + } + return weightUpdateCache; + } + + /** + * Print out the weights. + * + * @param mat + * @return + */ + protected static String weightsToString(DenseDoubleMatrix[] mat) { + StringBuilder sb = new StringBuilder(); + + for (int i = 0; i < mat.length; ++i) { + sb.append(String.format("Matrix [%d]\n", i)); + double[][] values = mat[i].getValues(); + for (int d = 0; d < values.length; ++d) { + sb.append(Arrays.toString(values[d])); + sb.append('\n'); + } + sb.append('\n'); + } + return sb.toString(); + } + +} Index: ml/src/main/java/org/apache/hama/ml/perception/SmallMultiLayerPerceptron.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/perception/SmallMultiLayerPerceptron.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/perception/SmallMultiLayerPerceptron.java (working copy) @@ -0,0 +1,465 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.perception; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.BSPJob; +import org.apache.hama.ml.math.DenseDoubleMatrix; +import org.apache.hama.ml.math.DenseDoubleVector; +import org.apache.hama.ml.math.DoubleVector; +import org.apache.hama.ml.writable.MatrixWritable; +import org.apache.hama.ml.writable.VectorWritable; +import org.mortbay.log.Log; + +/** + * SmallMultiLayerPerceptronBSP is a kind of multilayer perceptron whose + * parameters can be fit into the memory of a single machine. This kind of model + * can be trained and used more efficiently than the BigMultiLayerPerceptronBSP, + * whose parameters are distributedly stored in multiple machines. + * + * In general, it it is a multilayer perceptron that consists of one input + * layer, multiple hidden layer and one output layer. + * + * The number of neurons in the input layer should be consistent with the number + * of features in the training instance. The number of neurons in the output + * layer + */ +public final class SmallMultiLayerPerceptron extends MultiLayerPerceptron + implements Writable { + + /* The in-memory weight matrix */ + private DenseDoubleMatrix[] weightMatrice; + + /** + * {@inheritDoc} + */ + public SmallMultiLayerPerceptron(double learningRate, boolean regularization, + double momentum, String squashingFunctionName, String costFunctionName, + int[] layerSizeArray) { + super(learningRate, regularization, momentum, squashingFunctionName, + costFunctionName, layerSizeArray); + this.MLPType = "SmallMLP"; + initializeWeightMatrix(); + } + + /** + * {@inheritDoc} + */ + public SmallMultiLayerPerceptron(String modelPath) { + super(modelPath); + if (modelPath != null) { + try { + this.readFromModel(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + /** + * Initialize weight matrix using Gaussian distribution. Each weight is + * initialized in range (-0.5, 0.5) + */ + private void initializeWeightMatrix() { + this.weightMatrice = new DenseDoubleMatrix[this.numberOfLayers - 1]; + // each layer contains one bias neuron + Random rnd = new Random(); + for (int i = 0; i < this.numberOfLayers - 1; ++i) { + // add weights for bias + this.weightMatrice[i] = new DenseDoubleMatrix(this.layerSizeArray[i] + 1, + this.layerSizeArray[i + 1]); + int rowCount = this.weightMatrice[i].getRowCount(); + int colCount = this.weightMatrice[i].getColumnCount(); + for (int row = 0; row < rowCount; ++row) { + for (int col = 0; col < colCount; ++col) { + this.weightMatrice[i].set(row, col, rnd.nextDouble() - 0.5); + } + } + } + } + + @Override + /** + * {@inheritDoc} + * The model meta-data is stored in memory. + */ + public DoubleVector output(DoubleVector featureVector) throws Exception { + List outputCache = this.outputInternal(featureVector); + // the output of the last layer is the output of the MLP + return new DenseDoubleVector(outputCache.get(outputCache.size() - 1)); + } + + private List outputInternal(DoubleVector featureVector) + throws Exception { + + // store the output of the hidden layers and output layer, each array store + // one layer + List outputCache = new ArrayList(); + + // start from the first hidden layer + double[] intermediateResults = new double[this.layerSizeArray[0] + 1]; + if (intermediateResults.length - 1 != featureVector.getDimension()) { + throw new Exception( + "Input feature dimension incorrect! The dimension of input layer is " + + (this.layerSizeArray[0] - 1) + + ", but the dimension of input feature is " + + featureVector.getDimension()); + } + + // fill with input features + intermediateResults[0] = 1.0; // bias + for (int i = 0; i < featureVector.getDimension(); ++i) { + intermediateResults[i + 1] = featureVector.get(i); + } + outputCache.add(intermediateResults); + + // forward the intermediate results to next layer + for (int fromLayer = 0; fromLayer < this.numberOfLayers - 1; ++fromLayer) { + intermediateResults = forward(fromLayer, intermediateResults); + outputCache.add(intermediateResults); + } + + return outputCache; + } + + /** + * Calculate the intermediate results of layer fromLayer + 1. + * + * @param fromLayer The index of layer that forwards the intermediate results + * from. + * @return + */ + private double[] forward(int fromLayer, double[] intermediateResult) { + int toLayer = fromLayer + 1; + double[] results = null; + int offset = 0; + + if (toLayer < this.layerSizeArray.length - 1) { // add bias if it is not + // output layer + results = new double[this.layerSizeArray[toLayer] + 1]; + offset = 1; + results[0] = 1.0; // the bias + } else { + results = new double[this.layerSizeArray[toLayer]]; // no bias + } + + for (int neuronIdx = 0; neuronIdx < this.layerSizeArray[toLayer]; ++neuronIdx) { + // aggregate the results from previous layer + for (int prevNeuronIdx = 0; prevNeuronIdx < this.layerSizeArray[fromLayer] + 1; ++prevNeuronIdx) { + results[neuronIdx + offset] += this.weightMatrice[fromLayer].get( + prevNeuronIdx, neuronIdx) * intermediateResult[prevNeuronIdx]; + } + // calculate via squashing function + results[neuronIdx + offset] = this.squashingFunction.calculate(0, + results[neuronIdx + offset]); + } + + return results; + } + + /** + * Get the updated weights using one training instance. + * + * @param trainingInstance The trainingInstance is the concatenation of + * feature vector and class label vector. + * @return The update of each weight. + * @throws Exception + */ + DenseDoubleMatrix[] trainByInstance(DoubleVector trainingInstance) + throws Exception { + // initialize weight update matrices + DenseDoubleMatrix[] weightUpdateMatrices = new DenseDoubleMatrix[this.layerSizeArray.length - 1]; + for (int m = 0; m < weightUpdateMatrices.length; ++m) { + weightUpdateMatrices[m] = new DenseDoubleMatrix( + this.layerSizeArray[m] + 1, this.layerSizeArray[m + 1]); + } + + if (trainingInstance == null) { + return weightUpdateMatrices; + } + + double[] trainingVec = trainingInstance.toArray(); + double[] trainingFeature = Arrays.copyOfRange(trainingVec, 0, + this.layerSizeArray[0]); + double[] trainingLabels = Arrays.copyOfRange(trainingVec, + this.layerSizeArray[0], trainingVec.length); + + DoubleVector trainingFeatureVec = new DenseDoubleVector(trainingFeature); + List outputCache = this.outputInternal(trainingFeatureVec); + + // calculate the delta of output layer + double[] delta = new double[this.layerSizeArray[this.layerSizeArray.length - 1]]; + double[] outputLayerOutput = outputCache.get(outputCache.size() - 1); + double[] lastHiddenLayerOutput = outputCache.get(outputCache.size() - 2); + + for (int j = 0; j < delta.length; ++j) { + delta[j] = this.squashingFunction + .calculateDerivative(outputLayerOutput[j]) + * this.costFunction.calculateDerivative(trainingLabels[j], + outputLayerOutput[j]); + + // calculate the weight update matrix between the last hidden layer and + // the output layer + for (int i = 0; i < this.layerSizeArray[this.layerSizeArray.length - 2] + 1; ++i) { + double updatedValue = this.learningRate * delta[j] + * lastHiddenLayerOutput[i]; + weightUpdateMatrices[weightUpdateMatrices.length - 1].set(i, j, + updatedValue); + } + } + + // calculate the delta for each hidden layer through back-propagation + for (int l = this.layerSizeArray.length - 2; l >= 1; --l) { + delta = backpropagate(l, delta, outputCache, weightUpdateMatrices); + } + + return weightUpdateMatrices; + } + + /** + * Back-propagate the errors from nextLayer to prevLayer. The weight updated + * information will be stored in the weightUpdateMatrices, and the delta of + * the prevLayer would be returned. + * + * @param curLayerIdx The layer index of the current layer. + * @param nextLayerDelta The delta of the next layer. + * @param outputCache The cache of the output of all the layers. + * @param weightUpdateMatrices The weight update matrices. + * @return The delta of the previous layer, will be used for next iteration of + * back-propagation. + */ + private double[] backpropagate(int curLayerIdx, double[] nextLayerDelta, + List outputCache, DenseDoubleMatrix[] weightUpdateMatrices) { + int prevLayerIdx = curLayerIdx - 1; + double[] delta = new double[this.layerSizeArray[curLayerIdx]]; + double[] curLayerOutput = outputCache.get(curLayerIdx); + double[] prevLayerOutput = outputCache.get(prevLayerIdx); + + // for each neuron j in nextLayer, calculate the delta + for (int j = 0; j < delta.length; ++j) { + // aggregate delta from next layer + for (int k = 0; k < nextLayerDelta.length; ++k) { + double weight = this.weightMatrice[curLayerIdx].get(j, k); + delta[j] += weight * nextLayerDelta[k]; + } + delta[j] *= this.squashingFunction + .calculateDerivative(curLayerOutput[j + 1]); + + // calculate the weight update matrix between the previous layer and the + // current layer + for (int i = 0; i < weightUpdateMatrices[prevLayerIdx].getRowCount(); ++i) { + double updatedValue = this.learningRate * delta[j] * prevLayerOutput[i]; + weightUpdateMatrices[prevLayerIdx].set(i, j, updatedValue); + } + } + + return delta; + } + + @Override + /** + * {@inheritDoc} + */ + public void train(Path dataInputPath, Map trainingParams) + throws IOException, InterruptedException, ClassNotFoundException { + // create the BSP training job + Configuration conf = new Configuration(); + for (Map.Entry entry : trainingParams.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + + // put model related parameters + if (modelPath == null || modelPath.trim().length() == 0) { // build model + // from scratch + conf.set("MLPType", this.MLPType); + conf.set("learningRate", "" + this.learningRate); + conf.set("regularization", "" + this.regularization); + conf.set("momentum", "" + this.momentum); + conf.set("squashingFunctionName", this.squashingFunctionName); + conf.set("costFunctionName", this.costFunctionName); + StringBuilder layerSizeArraySb = new StringBuilder(); + for (int layerSize : this.layerSizeArray) { + layerSizeArraySb.append(layerSize); + layerSizeArraySb.append(' '); + } + conf.set("layerSizeArray", layerSizeArraySb.toString()); + } + + HamaConfiguration hamaConf = new HamaConfiguration(conf); + + BSPJob job = new BSPJob(hamaConf, SmallMLPTrainer.class); + job.setJobName("Small scale MLP training"); + job.setJarByClass(SmallMLPTrainer.class); + job.setBspClass(SmallMLPTrainer.class); + job.setInputPath(dataInputPath); + job.setInputFormat(org.apache.hama.bsp.SequenceFileInputFormat.class); + job.setInputKeyClass(LongWritable.class); + job.setInputValueClass(VectorWritable.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(NullWritable.class); + job.setOutputFormat(org.apache.hama.bsp.NullOutputFormat.class); + + int numTasks = conf.getInt("tasks", 1); + job.setNumBspTask(numTasks); + job.waitForCompletion(true); + + // reload learned model + Log.info(String.format("Reload model from %s.", + trainingParams.get("modelPath"))); + this.modelPath = trainingParams.get("modelPath"); + this.readFromModel(); + } + + @Override + public void readFields(DataInput input) throws IOException { + this.MLPType = WritableUtils.readString(input); + this.learningRate = input.readDouble(); + this.regularization = input.readBoolean(); + this.momentum = input.readDouble(); + this.numberOfLayers = input.readInt(); + this.squashingFunctionName = WritableUtils.readString(input); + this.costFunctionName = WritableUtils.readString(input); + // read the number of neurons for each layer + this.layerSizeArray = new int[this.numberOfLayers]; + for (int i = 0; i < numberOfLayers; ++i) { + this.layerSizeArray[i] = input.readInt(); + } + this.weightMatrice = new DenseDoubleMatrix[this.numberOfLayers - 1]; + for (int i = 0; i < numberOfLayers - 1; ++i) { + this.weightMatrice[i] = (DenseDoubleMatrix) MatrixWritable.read(input); + } + this.squashingFunction = SquashingFunctionFactory + .getSquashingFunction(this.squashingFunctionName); + this.costFunction = CostFunctionFactory + .getCostFunction(this.costFunctionName); + } + + @Override + public void write(DataOutput output) throws IOException { + WritableUtils.writeString(output, MLPType); + output.writeDouble(learningRate); + output.writeBoolean(regularization); + output.writeDouble(momentum); + output.writeInt(numberOfLayers); + WritableUtils.writeString(output, squashingFunctionName); + WritableUtils.writeString(output, costFunctionName); + + // write the number of neurons for each layer + for (int i = 0; i < this.numberOfLayers; ++i) { + output.writeInt(this.layerSizeArray[i]); + } + for (int i = 0; i < numberOfLayers - 1; ++i) { + MatrixWritable matrixWritable = new MatrixWritable(this.weightMatrice[i]); + matrixWritable.write(output); + } + } + + /** + * Read the model meta-data from the specified location. + * + * @throws IOException + */ + @Override + protected void readFromModel() throws IOException { + Configuration conf = new Configuration(); + try { + URI uri = new URI(modelPath); + FileSystem fs = FileSystem.get(uri, conf); + FSDataInputStream is = new FSDataInputStream(fs.open(new Path(modelPath))); + this.readFields(is); + } catch (URISyntaxException e) { + e.printStackTrace(); + } + } + + /** + * Write the model to file. + * + * @throws IOException + */ + @Override + public void writeModelToFile(String modelPath) throws IOException { + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + FSDataOutputStream stream = fs.create(new Path(modelPath), true); + this.write(stream); + stream.close(); + } + + DenseDoubleMatrix[] getWeightMatrices() { + return this.weightMatrice; + } + + void setWeightMatrices(DenseDoubleMatrix[] newMatrices) { + this.weightMatrice = newMatrices; + } + + /** + * Update the weight matrices with given updates. + * + * @param updateMatrices The updates weights in matrix format. + */ + void updateWeightMatrices(DenseDoubleMatrix[] updateMatrices) { + for (int m = 0; m < this.weightMatrice.length; ++m) { + this.weightMatrice[m] = (DenseDoubleMatrix) this.weightMatrice[m] + .add(updateMatrices[m]); + } + } + + /** + * Print out the weights. + * + * @param mat + * @return + */ + static String weightsToString(DenseDoubleMatrix[] mat) { + StringBuilder sb = new StringBuilder(); + + for (int i = 0; i < mat.length; ++i) { + sb.append(String.format("Matrix [%d]\n", i)); + double[][] values = mat[i].getValues(); + for (int d = 0; d < values.length; ++d) { + sb.append(Arrays.toString(values[d])); + sb.append('\n'); + } + sb.append('\n'); + } + return sb.toString(); + } + +} Index: ml/src/main/java/org/apache/hama/ml/perception/SquaredError.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/perception/SquaredError.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/perception/SquaredError.java (working copy) @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.perception; + +/** + * Square error cost function. + * + *
+ * cost(t, y) = 0.5 * (t - y) ˆ 2
+ * 
+ */ +public class SquaredError extends CostFunction { + + @Override + /** + * {@inheritDoc} + */ + public double calculate(double target, double actual) { + double diff = target - actual; + return 0.5 * diff * diff; + } + + @Override + /** + * {@inheritDoc} + */ + public double calculateDerivative(double target, double actual) { + return target - actual; + } + +} Index: ml/src/main/java/org/apache/hama/ml/perception/SquashingFunction.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/perception/SquashingFunction.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/perception/SquashingFunction.java (working copy) @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.perception; + +import org.apache.hama.ml.math.DoubleVectorFunction; + +/** + * The squashing function to activate the neurons. + * + */ +public abstract class SquashingFunction implements DoubleVectorFunction { + + /** + * Calculates the result with a given index and value of a vector. + */ + @Override + public abstract double calculate(int index, double value); + + /** + * Apply the gradient descent to each of the elements in vector. + * + * @param vector + * @return + */ + public abstract double calculateDerivative(double value); +} Index: ml/src/main/java/org/apache/hama/ml/perception/SquashingFunctionFactory.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/perception/SquashingFunctionFactory.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/perception/SquashingFunctionFactory.java (working copy) @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.perception; + +/** + * Get the squashing function according to the name. + */ +public class SquashingFunctionFactory { + + /** + * Get the squashing function instance according to the name. If no matched + * squahsing function is found, return the sigmoid squashing function by + * default. + * + * @param name The name of the squashing function. + * @return The instance of the squashing function. + */ + public static SquashingFunction getSquashingFunction(String name) { + if (name.equalsIgnoreCase("Sigmoid")) { + return new Sigmoid(); + } else if (name.equalsIgnoreCase("Tanh")) { + return new Tanh(); + } + return new Sigmoid(); + } + +} Index: ml/src/main/java/org/apache/hama/ml/perception/Tanh.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/perception/Tanh.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/perception/Tanh.java (working copy) @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.perception; + +/** + * The hyperbolic tangent function. It is used as a squashing function in + * multi-layer perceptron. + */ +public class Tanh extends SquashingFunction { + + @Override + public double calculate(int index, double value) { + return Math.tanh(value); + } + + @Override + public double calculateDerivative(double value) { + return 1 - value * value; + } + +} Index: ml/src/test/java/org/apache/hama/ml/perception/TestSmallMLPMessage.java =================================================================== --- ml/src/test/java/org/apache/hama/ml/perception/TestSmallMLPMessage.java (revision 0) +++ ml/src/test/java/org/apache/hama/ml/perception/TestSmallMLPMessage.java (working copy) @@ -0,0 +1,89 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.perception; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hama.ml.math.DenseDoubleMatrix; +import org.junit.Test; + + +/** + * Test the functionalities of SmallMLPMessage + * + */ +public class TestSmallMLPMessage { + + @Test + public void testReadWrite() { + int owner = 101; + double[][] mat = { { 1, 2, 3 }, { 4, 5, 6 }, { 7, 8, 9 } }; + + double[][] mat2 = { { 10, 20 }, { 30, 40 }, { 50, 60 } }; + + double[][][] mats = { mat, mat2 }; + + DenseDoubleMatrix[] matrices = new DenseDoubleMatrix[] { + new DenseDoubleMatrix(mat), new DenseDoubleMatrix(mat2) }; + + SmallMLPMessage message = new SmallMLPMessage(owner, true, matrices); + + Configuration conf = new Configuration(); + String strPath = "/tmp/testSmallMLPMessage"; + Path path = new Path(strPath); + try { + FileSystem fs = FileSystem.get(new URI(strPath), conf); + FSDataOutputStream out = fs.create(path, true); + message.write(out); + out.close(); + + FSDataInputStream in = fs.open(path); + SmallMLPMessage outMessage = new SmallMLPMessage(0, false, null); + outMessage.readFields(in); + + assertEquals(owner, outMessage.getOwner()); + DenseDoubleMatrix[] outMatrices = outMessage.getWeightsUpdatedMatrices(); + // check each matrix + for (int i = 0; i < outMatrices.length; ++i) { + double[][] outMat = outMessage.getWeightsUpdatedMatrices()[i] + .getValues(); + for (int j = 0; j < outMat.length; ++j) { + assertArrayEquals(mats[i][j], outMat[j], 0.0001); + } + } + + fs.delete(path, true); + } catch (IOException e) { + e.printStackTrace(); + } catch (URISyntaxException e) { + e.printStackTrace(); + } + + } +} Index: ml/src/test/java/org/apache/hama/ml/perception/TestSmallMultiLayerPerceptron.java =================================================================== --- ml/src/test/java/org/apache/hama/ml/perception/TestSmallMultiLayerPerceptron.java (revision 0) +++ ml/src/test/java/org/apache/hama/ml/perception/TestSmallMultiLayerPerceptron.java (working copy) @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.perception; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hama.ml.math.DenseDoubleMatrix; +import org.apache.hama.ml.math.DenseDoubleVector; +import org.apache.hama.ml.math.DoubleMatrix; +import org.apache.hama.ml.math.DoubleVector; +import org.apache.hama.ml.writable.MatrixWritable; +import org.apache.hama.ml.writable.VectorWritable; +import org.junit.Ignore; +import org.junit.Test; + +public class TestSmallMultiLayerPerceptron { + + /** + * Write and read the parameters of MLP. + */ + @Test + public void testWriteReadMLP() { + String modelPath = "/tmp/sampleModel-testWriteReadMLP.data"; + double learningRate = 0.5; + boolean regularization = false; // no regularization + double momentum = 0; // no momentum + String squashingFunctionName = "Sigmoid"; + String costFunctionName = "SquaredError"; + int[] layerSizeArray = new int[] { 3, 2, 2, 3 }; + MultiLayerPerceptron mlp = new SmallMultiLayerPerceptron(learningRate, + regularization, momentum, squashingFunctionName, costFunctionName, + layerSizeArray); + try { + mlp.writeModelToFile(modelPath); + } catch (IOException e) { + e.printStackTrace(); + } + + try { + // read the meta-data + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + mlp = new SmallMultiLayerPerceptron(modelPath); + assertEquals("SmallMLP", mlp.getMLPType()); + assertEquals(learningRate, mlp.getLearningRate(), 0.001); + assertEquals(regularization, mlp.isRegularization()); + assertEquals(layerSizeArray.length, mlp.getNumberOfLayers()); + assertEquals(momentum, mlp.getMomentum(), 0.001); + assertEquals(squashingFunctionName, mlp.getSquashingFunctionName()); + assertEquals(costFunctionName, mlp.getCostFunctionName()); + assertArrayEquals(layerSizeArray, mlp.getLayerSizeArray()); + // delete test file + fs.delete(new Path(modelPath), true); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * Test the output of an example MLP. + */ + @Test + public void testOutput() { + // write the MLP meta-data manually + String modelPath = "/tmp/sampleModel-testOutput.data"; + Configuration conf = new Configuration(); + try { + FileSystem fs = FileSystem.get(conf); + FSDataOutputStream output = fs.create(new Path(modelPath), true); + + String MLPType = "SmallMLP"; + double learningRate = 0.5; + boolean regularization = false; + double momentum = 0; + String squashingFunctionName = "Sigmoid"; + String costFunctionName = "SquaredError"; + int[] layerSizeArray = new int[] { 3, 2, 3, 3 }; + int numberOfLayers = layerSizeArray.length; + + WritableUtils.writeString(output, MLPType); + output.writeDouble(learningRate); + output.writeBoolean(regularization); + output.writeDouble(momentum); + output.writeInt(numberOfLayers); + WritableUtils.writeString(output, squashingFunctionName); + WritableUtils.writeString(output, costFunctionName); + + // write the number of neurons for each layer + for (int i = 0; i < numberOfLayers; ++i) { + output.writeInt(layerSizeArray[i]); + } + + double[][] matrix01 = { // 4 by 2 + { 0.5, 0.2 }, { 0.1, 0.1 }, { 0.2, 0.5 }, { 0.1, 0.5 } }; + + double[][] matrix12 = { // 3 by 3 + { 0.1, 0.2, 0.5 }, { 0.2, 0.5, 0.2 }, { 0.5, 0.5, 0.1 } }; + + double[][] matrix23 = { // 4 by 3 + { 0.2, 0.5, 0.2 }, { 0.5, 0.1, 0.5 }, { 0.1, 0.2, 0.1 }, + { 0.1, 0.2, 0.5 } }; + + DoubleMatrix[] matrices = { new DenseDoubleMatrix(matrix01), + new DenseDoubleMatrix(matrix12), new DenseDoubleMatrix(matrix23) }; + for (DoubleMatrix mat : matrices) { + MatrixWritable.write(mat, output); + } + output.close(); + + } catch (IOException e) { + e.printStackTrace(); + } + + // initial the mlp with existing model meta-data and get the output + MultiLayerPerceptron mlp = new SmallMultiLayerPerceptron(modelPath); + DoubleVector input = new DenseDoubleVector(new double[] { 1, 2, 3 }); + try { + DoubleVector result = mlp.output(input); + assertArrayEquals(new double[] { 0.6636557, 0.7009963, 0.7213835 }, + result.toArray(), 0.0001); + } catch (Exception e1) { + e1.printStackTrace(); + } + + // delete meta-data + try { + FileSystem fs = FileSystem.get(conf); + fs.delete(new Path(modelPath), true); + } catch (IOException e) { + e.printStackTrace(); + } + + } + + /** + * Test the MLP on XOR problem. + */ + @Test + public void testSingleInstanceTraining() { + // generate training data + DoubleVector[] trainingData = new DenseDoubleVector[] { + new DenseDoubleVector(new double[] { 0, 0, 0 }), + new DenseDoubleVector(new double[] { 0, 1, 1 }), + new DenseDoubleVector(new double[] { 1, 0, 1 }), + new DenseDoubleVector(new double[] { 1, 1, 0 }) }; + + // set parameters + double learningRate = 0.6; + boolean regularization = false; // no regularization + double momentum = 0; // no momentum + String squashingFunctionName = "Sigmoid"; + String costFunctionName = "SquaredError"; + int[] layerSizeArray = new int[] { 2, 5, 1 }; + SmallMultiLayerPerceptron mlp = new SmallMultiLayerPerceptron(learningRate, + regularization, momentum, squashingFunctionName, costFunctionName, + layerSizeArray); + + try { + // train by multiple instances + Random rnd = new Random(); + for (int i = 0; i < 30000; ++i) { + DenseDoubleMatrix[] weightUpdates = mlp + .trainByInstance(trainingData[rnd.nextInt(4)]); + mlp.updateWeightMatrices(weightUpdates); + } + + // System.out.printf("Weight matrices: %s\n", + // mlp.weightsToString(mlp.getWeightMatrices())); + for (int i = 0; i < trainingData.length; ++i) { + DenseDoubleVector testVec = (DenseDoubleVector) trainingData[i] + .slice(2); + assertEquals(trainingData[i].toArray()[2], mlp.output(testVec) + .toArray()[0], 0.2); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * Test the XOR problem. + */ + @Test + public void testTrainingByXOR() { + // write in some training instances + Configuration conf = new Configuration(); + String strDataPath = "/tmp/xor-training-by-xor"; + Path dataPath = new Path(strDataPath); + + // generate training data + DoubleVector[] trainingData = new DenseDoubleVector[] { + new DenseDoubleVector(new double[] { 0, 0, 0 }), + new DenseDoubleVector(new double[] { 0, 1, 1 }), + new DenseDoubleVector(new double[] { 1, 0, 1 }), + new DenseDoubleVector(new double[] { 1, 1, 0 }) }; + + try { + URI uri = new URI(strDataPath); + FileSystem fs = FileSystem.get(uri, conf); + fs.delete(dataPath, true); + if (!fs.exists(dataPath)) { + fs.createNewFile(dataPath); + SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, + dataPath, LongWritable.class, VectorWritable.class); + + for (int i = 0; i < 1000; ++i) { + VectorWritable vecWritable = new VectorWritable(trainingData[i % 4]); + writer.append(new LongWritable(i), vecWritable); + } + writer.close(); + } + + } catch (Exception e) { + e.printStackTrace(); + } + + // begin training + String modelPath = "/tmp/xorModel-training-by-xor.data"; + double learningRate = 0.6; + boolean regularization = false; // no regularization + double momentum = 0; // no momentum + String squashingFunctionName = "Tanh"; + String costFunctionName = "SquaredError"; + int[] layerSizeArray = new int[] { 2, 5, 1 }; + SmallMultiLayerPerceptron mlp = new SmallMultiLayerPerceptron(learningRate, + regularization, momentum, squashingFunctionName, costFunctionName, + layerSizeArray); + + Map trainingParams = new HashMap(); + trainingParams.put("training.iteration", "10000"); + trainingParams.put("training.mode", "minibatch.gradient.descent"); + trainingParams.put("training.batch.size", "100"); + trainingParams.put("tasks", "3"); + trainingParams.put("modelPath", modelPath); + + try { + mlp.train(dataPath, trainingParams); + } catch (Exception e) { + e.printStackTrace(); + } + + // test the model + for (int i = 0; i < trainingData.length; ++i) { + DenseDoubleVector testVec = (DenseDoubleVector) trainingData[i].slice(2); + try { + DenseDoubleVector actual = (DenseDoubleVector) mlp.output(testVec); + assertEquals(trainingData[i].toArray()[2], actual.get(0), 0.2); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + +}