Index: ml/src/main/java/org/apache/hama/ml/ann/AbstractLayeredNeuralNetwork.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/ann/AbstractLayeredNeuralNetwork.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/ann/AbstractLayeredNeuralNetwork.java (working copy) @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.ann; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.io.WritableUtils; +import org.apache.hama.ml.math.DoubleDoubleFunction; +import org.apache.hama.ml.math.DoubleFunction; +import org.apache.hama.ml.math.DoubleMatrix; +import org.apache.hama.ml.math.DoubleVector; +import org.apache.hama.ml.math.FunctionFactory; + +import com.google.common.base.Preconditions; + +/** + * AbstractLayeredNeuralNetwork defines the general operations for derivative + * layered models, include Linear Regression, Logistic Regression, Multilayer + * Perceptron, Autoencoder, and Restricted Boltzmann Machine, etc. + * + * In general, these models consist of neurons which are aligned in layers. + * Between layers, for any two adjacent layers, the neurons are connected to + * form a bipartite weighted graph. + * + */ +abstract class AbstractLayeredNeuralNetwork extends NeuralNetwork { + + private static final double DEFAULT_REGULARIZATION_WEIGHT = 0; + private static final double DEFAULT_MOMENTUM_WEIGHT = 0.1; + + double trainingError; + + /* The weight of regularization */ + protected double regularizationWeight; + + /* The momentumWeight */ + protected double momentumWeight; + + /* The cost function of the model */ + protected DoubleDoubleFunction costFunction; + + /* Record the size of each layer */ + protected List layerSizeList; + + protected TrainingMethod trainingMethod; + + public static enum TrainingMethod { + GRADIATE_DESCENT + } + + public AbstractLayeredNeuralNetwork() { + this.regularizationWeight = DEFAULT_REGULARIZATION_WEIGHT; + this.momentumWeight = DEFAULT_MOMENTUM_WEIGHT; + this.trainingMethod = TrainingMethod.GRADIATE_DESCENT; + } + + public AbstractLayeredNeuralNetwork(String modelPath) { + super(modelPath); + } + + /** + * Set the regularization weight. Recommend in the range [0, 0.1), More + * complex the model is, less weight the regularization is. + * + * @param regularization + */ + public void setRegularizationWeight(double regularizationWeight) { + Preconditions.checkArgument(regularizationWeight >= 0 + && regularizationWeight < 1.0, + "Regularization weight must be in range [0, 1.0)"); + this.regularizationWeight = regularizationWeight; + } + + public double getRegularizationWeight() { + return this.regularizationWeight; + } + + /** + * Set the momemtum weight for the model. Recommend in range [0, 0.5]. + * + * @param momentumWeight + */ + public void setMomemtumWeight(double momentumWeight) { + Preconditions.checkArgument(momentumWeight >= 0 && momentumWeight <= 1.0, + "Momentum weight must be in range [0, 1.0]"); + this.momentumWeight = momentumWeight; + } + + public double getMomemtumWeight() { + return this.momentumWeight; + } + + public void setTrainingMethod(TrainingMethod method) { + this.trainingMethod = method; + } + + public TrainingMethod getTrainingMethod() { + return this.trainingMethod; + } + + /** + * Set the cost function for the model. + * + * @param costFunctionName + */ + public void setCostFunction(DoubleDoubleFunction costFunction) { + this.costFunction = costFunction; + } + + /** + * Add a layer of neurons with specified size. If the added layer is not the + * first layer, it will automatically connects the neurons between with the + * previous layer. + * + * @param size + * @param isFinalLayer If false, add a bias neuron. + * @param squashingFunction The squashing function for this layer, input layer + * is f(x) = x by default. + * @return The layer index, starts with 0. + */ + public abstract int addLayer(int size, boolean isFinalLayer, + DoubleFunction squashingFunction); + + /** + * Get the size of a particular layer. + * + * @param layer + * @return + */ + public int getLayerSize(int layer) { + Preconditions.checkArgument( + layer >= 0 && layer < this.layerSizeList.size(), + String.format("Input must be in range [0, %d]\n", + this.layerSizeList.size() - 1)); + return this.layerSizeList.get(layer); + } + + /** + * Get the layer size list. + * + * @return + */ + protected List getLayerSizeList() { + return this.layerSizeList; + } + + /** + * Get the weights between layer layerIdx and layerIdx + 1 + * + * @param layerIdx The index of the layer + * @return The weights in form of {@link DoubleMatrix} + */ + public abstract DoubleMatrix getWeightsByLayer(int layerIdx); + + /** + * Get the updated weights using one training instance. + * + * @param trainingInstance The trainingInstance is the concatenation of + * feature vector and class label vector. + * @return The update of each weight, in form of matrix list. + * @throws Exception + */ + public abstract DoubleMatrix[] trainByInstance(DoubleVector trainingInstance); + + /** + * Get the output calculated by the model. + * + * @param instance The feature instance. + * @return + */ + public abstract DoubleVector getOutput(DoubleVector instance); + + /** + * Calculate the training error based on the labels and outputs. + * + * @param labels + * @param output + * @return + */ + protected abstract void calculateTrainingError(DoubleVector labels, + DoubleVector output); + + @Override + public void readFields(DataInput input) throws IOException { + super.readFields(input); + // read regularization weight + this.regularizationWeight = input.readDouble(); + // read momentum weight + this.momentumWeight = input.readDouble(); + + // read cost function + this.costFunction = FunctionFactory + .createDoubleDoubleFunction(WritableUtils.readString(input)); + + // read layer size list + int numLayers = input.readInt(); + this.layerSizeList = new ArrayList(); + for (int i = 0; i < numLayers; ++i) { + this.layerSizeList.add(input.readInt()); + } + + this.trainingMethod = WritableUtils.readEnum(input, TrainingMethod.class); + } + + @Override + public void write(DataOutput output) throws IOException { + super.write(output); + // write regularization weight + output.writeDouble(this.regularizationWeight); + // write momentum weight + output.writeDouble(this.momentumWeight); + + // write cost function + WritableUtils.writeString(output, costFunction.getFunctionName()); + + // write layer size list + output.writeInt(this.layerSizeList.size()); + for (int i = 0; i < this.layerSizeList.size(); ++i) { + output.writeInt(this.layerSizeList.get(i)); + } + + WritableUtils.writeEnum(output, this.trainingMethod); + } + +} Index: ml/src/main/java/org/apache/hama/ml/ann/NeuralNetwork.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/ann/NeuralNetwork.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/ann/NeuralNetwork.java (working copy) @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.ann; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +import com.google.common.base.Preconditions; + +/** + * NeuralNetwork defines the general operations for all the derivative models. + * Typically, all derivative models such as Linear Regression, Logistic + * Regression, and Multilayer Perceptron consist of neurons and the weights + * between neurons. + * + */ +abstract class NeuralNetwork implements Writable { + + private static final double DEFAULT_LEARNING_RATE = 0.5; + + protected double learningRate; + protected boolean learningRateDecay = false; + + // the name of the model + protected String modelType; + // the path to store the model + protected String modelPath; + + public NeuralNetwork() { + this.learningRate = DEFAULT_LEARNING_RATE; + this.modelType = this.getClass().getSimpleName(); + } + + public NeuralNetwork(String modelPath) { + try { + this.modelPath = modelPath; + this.readFromModel(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * Set the degree of aggression during model training, a large learning rate + * can increase the training speed, but it also decrease the chance of model + * converge. Recommend in range (0, 0.3). + * + * @param learningRate + */ + public void setLearningRate(double learningRate) { + Preconditions.checkArgument(learningRate > 0, + "Learning rate must larger than 0."); + this.learningRate = learningRate; + } + + public double getLearningRate() { + return this.learningRate; + } + + public void isLearningRateDecay(boolean decay) { + this.learningRateDecay = decay; + } + + public String getModelType() { + return this.modelType; + } + + /** + * Train the model with the path of given training data and parameters. + * + * @param dataInputPath The path of the training data. + * @param trainingParams The parameters for training. + * @throws IOException + */ + public void train(Path dataInputPath, Map trainingParams) { + Preconditions.checkArgument(this.modelPath != null, + "Please set the model path before training."); + // train with BSP job + try { + trainInternal(dataInputPath, trainingParams); + // write the trained model back to model path + this.readFromModel(); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } + } + + /** + * Train the model with the path of given training data and parameters. + * + * @param dataInputPath + * @param trainingParams + */ + protected abstract void trainInternal(Path dataInputPath, + Map trainingParams) throws IOException, + InterruptedException, ClassNotFoundException; + + /** + * Read the model meta-data from the specified location. + * + * @throws IOException + */ + protected void readFromModel() throws IOException { + Preconditions.checkArgument(this.modelPath != null, + "Model path has not been set."); + Configuration conf = new Configuration(); + try { + URI uri = new URI(this.modelPath); + FileSystem fs = FileSystem.get(uri, conf); + FSDataInputStream is = new FSDataInputStream(fs.open(new Path(modelPath))); + this.readFields(is); + } catch (URISyntaxException e) { + e.printStackTrace(); + } + } + + /** + * Write the model data to specified location. + * + * @param modelPath The location in file system to store the model. + * @throws IOException + */ + public void writeModelToFile() throws IOException { + Preconditions.checkArgument(this.modelPath != null, + "Model path has not been set."); + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + FSDataOutputStream stream = fs.create(new Path(this.modelPath), true); + this.write(stream); + stream.close(); + } + + /** + * Set the model path. + * + * @param modelPath + */ + public void setModelPath(String modelPath) { + this.modelPath = modelPath; + } + + /** + * Get the model path. + * + * @return + */ + public String getModelPath() { + return this.modelPath; + } + + public void readFields(DataInput input) throws IOException { + // read model type + this.modelType = WritableUtils.readString(input); + // read learning rate + this.learningRate = input.readDouble(); + // read model path + this.modelPath = WritableUtils.readString(input); + if (this.modelPath.equals("null")) { + this.modelPath = null; + } + } + + public void write(DataOutput output) throws IOException { + // write model type + WritableUtils.writeString(output, modelType); + // write learning rate + output.writeDouble(learningRate); + // write model path + if (this.modelPath != null) { + WritableUtils.writeString(output, modelPath); + } else { + WritableUtils.writeString(output, "null"); + } + } + +} Index: ml/src/main/java/org/apache/hama/ml/ann/NeuralNetworkTrainer.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/ann/NeuralNetworkTrainer.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/ann/NeuralNetworkTrainer.java (working copy) @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.ann; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hama.bsp.BSP; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.sync.SyncException; +import org.apache.hama.ml.perception.MLPMessage; +import org.apache.hama.ml.writable.VectorWritable; + +/** + * The trainer that is used to train the {@link SmallLayeredNeuralNetwork} with + * BSP. The trainer would read the training data and obtain the trained + * parameters of the model. + * + */ +public abstract class NeuralNetworkTrainer extends + BSP { + + protected static final Log LOG = LogFactory + .getLog(NeuralNetworkTrainer.class); + + protected Configuration conf; + protected int maxIteration; + protected int batchSize; + protected String trainingMode; + + @Override + final public void setup( + BSPPeer peer) + throws IOException, SyncException, InterruptedException { + conf = peer.getConfiguration(); + + this.extraSetup(peer); + } + + /** + * Handle extra setup for sub-classes. + * + * @param peer + * @throws IOException + * @throws SyncException + * @throws InterruptedException + */ + protected void extraSetup( + BSPPeer peer) + throws IOException, SyncException, InterruptedException { + + } + + /** + * {@inheritDoc} + */ + @Override + public abstract void bsp( + BSPPeer peer) + throws IOException, SyncException, InterruptedException; + + @Override + public void cleanup( + BSPPeer peer) + throws IOException { + this.extraCleanup(peer); + // write model to modelPath + } + + /** + * Handle cleanup for sub-classes. Write the trained model back. + * + * @param peer + * @throws IOException + * @throws SyncException + * @throws InterruptedException + */ + protected void extraCleanup( + BSPPeer peer) + throws IOException { + + } + +} Index: ml/src/main/java/org/apache/hama/ml/ann/SmallLayeredNeuralNetwork.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/ann/SmallLayeredNeuralNetwork.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/ann/SmallLayeredNeuralNetwork.java (working copy) @@ -0,0 +1,557 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.ann; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.BSPJob; +import org.apache.hama.ml.math.DenseDoubleMatrix; +import org.apache.hama.ml.math.DenseDoubleVector; +import org.apache.hama.ml.math.DoubleFunction; +import org.apache.hama.ml.math.DoubleMatrix; +import org.apache.hama.ml.math.DoubleVector; +import org.apache.hama.ml.math.FunctionFactory; +import org.apache.hama.ml.writable.MatrixWritable; +import org.apache.hama.ml.writable.VectorWritable; +import org.mortbay.log.Log; + +import com.google.common.base.Preconditions; + +/** + * SmallLayeredNeuralNetwork defines the general operations for derivative + * layered models, include Linear Regression, Logistic Regression, Multilayer + * Perceptron, Autoencoder, and Restricted Boltzmann Machine, etc. For + * SmallLayeredNeuralNetwork, the training can be conducted in parallel, but the + * parameters of the models are assumes to be stored in a single machine. + * + * In general, these models consist of neurons which are aligned in layers. + * Between layers, for any two adjacent layers, the neurons are connected to + * form a bipartite weighted graph. + * + */ +public class SmallLayeredNeuralNetwork extends AbstractLayeredNeuralNetwork { + + /* Weights between neurons at adjacent layers */ + protected List weightMatrixList; + + /* Previous weight updates between neurons at adjacent layers */ + protected List prevWeightUpdatesList; + + /* Different layers can have different squashing function */ + protected List squashingFunctionList; + + protected int finalLayerIdx; + + public SmallLayeredNeuralNetwork() { + this.layerSizeList = new ArrayList(); + this.weightMatrixList = new ArrayList(); + this.prevWeightUpdatesList = new ArrayList(); + this.squashingFunctionList = new ArrayList(); + } + + public SmallLayeredNeuralNetwork(String modelPath) { + super(modelPath); + } + + @Override + /** + * {@inheritDoc} + */ + public int addLayer(int size, boolean isFinalLayer, + DoubleFunction squashingFunction) { + Preconditions.checkArgument(size > 0, "Size of layer must larger than 0."); + if (!isFinalLayer) { + size += 1; + } + + this.layerSizeList.add(size); + int layerIdx = this.layerSizeList.size() - 1; + if (isFinalLayer) { + this.finalLayerIdx = layerIdx; + } + + // add weights between current layer and previous layer, and input layer has + // no squashing function + if (layerIdx > 0) { + int sizePrevLayer = this.layerSizeList.get(layerIdx - 1); + // row count equals to size of current size and column count equals to + // size of previous layer + int row = isFinalLayer ? size : size - 1; + int col = sizePrevLayer; + DoubleMatrix weightMatrix = new DenseDoubleMatrix(row, col); + // initialize weights + final Random rnd = new Random(); + weightMatrix.applyToElements(new DoubleFunction() { + @Override + public double apply(double value) { + return rnd.nextDouble() - 0.5; + } + + @Override + public double applyDerivative(double value) { + throw new UnsupportedOperationException(""); + } + }); + this.weightMatrixList.add(weightMatrix); + this.prevWeightUpdatesList.add(new DenseDoubleMatrix(row, col)); + this.squashingFunctionList.add(squashingFunction); + } + return layerIdx; + } + + /** + * Update the weight matrices with given matrices. + * + * @param matrices + */ + public void updateWeightMatrices(DoubleMatrix[] matrices) { + for (int i = 0; i < matrices.length; ++i) { + DoubleMatrix matrix = this.weightMatrixList.get(i); + this.weightMatrixList.set(i, matrix.add(matrices[i])); + } + } + + void setPrevWeightMatrices(DoubleMatrix[] prevUpdates) { + this.prevWeightUpdatesList.clear(); + for (DoubleMatrix prevUpdate : prevUpdates) { + this.prevWeightUpdatesList.add(prevUpdate); + } + } + + /** + * Add a batch of matrices onto the given destination matrices. + * + * @param destMatrices + * @param sourceMatrices + */ + static void matricesAdd(DoubleMatrix[] destMatrices, + DoubleMatrix[] sourceMatrices) { + for (int i = 0; i < destMatrices.length; ++i) { + destMatrices[i] = destMatrices[i].add(sourceMatrices[i]); + } + } + + /** + * Get all the weight matrices. + * + * @return + */ + DoubleMatrix[] getWeightMatrices() { + DoubleMatrix[] matrices = new DoubleMatrix[this.weightMatrixList.size()]; + this.weightMatrixList.toArray(matrices); + return matrices; + } + + /** + * Set the weight matrices. + * + * @param matrices + */ + public void setWeightMatrices(DoubleMatrix[] matrices) { + this.weightMatrixList = new ArrayList(); + for (int i = 0; i < matrices.length; ++i) { + this.weightMatrixList.add(matrices[i]); + } + } + + /** + * Get the previous matrices updates in form of array. + * + * @return + */ + public DoubleMatrix[] getPrevMatricesUpdates() { + DoubleMatrix[] prevMatricesUpdates = new DoubleMatrix[this.prevWeightUpdatesList + .size()]; + for (int i = 0; i < this.prevWeightUpdatesList.size(); ++i) { + prevMatricesUpdates[i] = this.prevWeightUpdatesList.get(i); + } + return prevMatricesUpdates; + } + + public void setWeightMatrix(int index, DoubleMatrix matrix) { + Preconditions.checkArgument( + 0 <= index && index < this.weightMatrixList.size(), + String.format("index [%d] out of range.", index)); + this.weightMatrixList.set(index, matrix); + } + + @Override + public void readFields(DataInput input) throws IOException { + super.readFields(input); + + // read squash functions + int squashingFunctionSize = input.readInt(); + this.squashingFunctionList = new ArrayList(); + for (int i = 0; i < squashingFunctionSize; ++i) { + this.squashingFunctionList.add(FunctionFactory + .createDoubleFunction(WritableUtils.readString(input))); + } + + // read weights and construct matrices of previous updates + int numOfMatrices = input.readInt(); + this.weightMatrixList = new ArrayList(); + this.prevWeightUpdatesList = new ArrayList(); + for (int i = 0; i < numOfMatrices; ++i) { + DoubleMatrix matrix = MatrixWritable.read(input); + this.weightMatrixList.add(matrix); + this.prevWeightUpdatesList.add(new DenseDoubleMatrix( + matrix.getRowCount(), matrix.getColumnCount())); + } + + } + + @Override + public void write(DataOutput output) throws IOException { + super.write(output); + + // write squashing functions + output.writeInt(this.squashingFunctionList.size()); + for (int i = 0; i < this.squashingFunctionList.size(); ++i) { + WritableUtils.writeString(output, this.squashingFunctionList.get(i) + .getFunctionName()); + } + + // write weight matrices + output.writeInt(this.weightMatrixList.size()); + for (int i = 0; i < this.weightMatrixList.size(); ++i) { + MatrixWritable.write(this.weightMatrixList.get(i), output); + } + + // DO NOT WRITE WEIGHT UPDATE + } + + @Override + public DoubleMatrix getWeightsByLayer(int layerIdx) { + return this.weightMatrixList.get(layerIdx); + } + + /** + * Get the output of the model according to given feature instance. + */ + public DoubleVector getOutput(DoubleVector instance) { + Preconditions.checkArgument(this.layerSizeList.get(0) == instance + .getDimension() + 1, String.format( + "The dimension of input instance should be %d.", + this.layerSizeList.get(0) - 1)); + // add bias feature + DoubleVector instanceWithBias = new DenseDoubleVector( + instance.getDimension() + 1); + instanceWithBias.set(0, 0.99999); // set bias to be a little bit less than + // 1.0 + for (int i = 1; i < instanceWithBias.getDimension(); ++i) { + instanceWithBias.set(i, instance.get(i - 1)); + } + + List outputCache = getOutputInternal(instanceWithBias); + // return the output of the last layer + DoubleVector result = outputCache.get(outputCache.size() - 1); + // remove bias + return result.sliceUnsafe(1, result.getDimension() - 1); + } + + /** + * Calculate output internally, the intermediate output of each layer will be + * stored. + * + * @param instance The instance contains the features. + * @return Cached output of each layer. + */ + public List getOutputInternal(DoubleVector instance) { + List outputCache = new ArrayList(); + // fill with instance + DoubleVector intermediateOutput = instance; + outputCache.add(intermediateOutput); + // System.out.printf("Input layer: %s\n", intermediateOutput.toString()); + + for (int i = 0; i < this.layerSizeList.size() - 1; ++i) { + intermediateOutput = forward(i, intermediateOutput); + outputCache.add(intermediateOutput); + } + return outputCache; + } + + /** + * Forward the calculation for one layer. + * + * @param fromLayer The index of the previous layer. + * @param intermediateOutput The intermediateOutput of previous layer. + * @return + */ + protected DoubleVector forward(int fromLayer, DoubleVector intermediateOutput) { + DoubleMatrix weightMatrix = this.weightMatrixList.get(fromLayer); + + DoubleVector vec = weightMatrix.multiplyVectorUnsafe(intermediateOutput); + // System.out.printf("Before applying squashing, from Layer %d to %d: %s\n", + // fromLayer, fromLayer + 1, vec.toString()); + vec = vec.applyToElements(this.squashingFunctionList.get(fromLayer)); + // System.out.printf("After applying squashing, from Layer %d to %d: %s\n", + // fromLayer, fromLayer + 1, vec.toString()); + + // add bias + DoubleVector vecWithBias = new DenseDoubleVector(vec.getDimension() + 1); + vecWithBias.set(0, 1); + for (int i = 0; i < vec.getDimension(); ++i) { + vecWithBias.set(i + 1, vec.get(i)); + } + return vecWithBias; + } + + /** + * Train the model online. + * + * @param trainingInstance + */ + public void trainOnline(DoubleVector trainingInstance) { + DoubleMatrix[] updateMatrices = this.trainByInstance(trainingInstance); + // System.out.printf("Sum: %f\n", updateMatrices[0].sum()); + this.updateWeightMatrices(updateMatrices); + } + + @Override + public DoubleMatrix[] trainByInstance(DoubleVector trainingInstance) { + // validate training instance + int inputDimension = this.layerSizeList.get(0) - 1; + int outputDimension = this.layerSizeList.get(this.layerSizeList.size() - 1); + Preconditions.checkArgument( + inputDimension + outputDimension == trainingInstance.getDimension(), + String.format( + "The dimension of training instance is %d, but requires %d.", + trainingInstance.getDimension(), inputDimension + outputDimension)); + + // prepare the features and labels + DoubleVector inputInstance = new DenseDoubleVector( + this.layerSizeList.get(0)); + inputInstance.set(0, 1); // add bias + for (int i = 0; i < inputDimension; ++i) { + inputInstance.set(i + 1, trainingInstance.get(i)); + } + + DoubleVector labels = trainingInstance.sliceUnsafe( + inputInstance.getDimension() - 1, trainingInstance.getDimension() - 1); + + List internalResults = this.getOutputInternal(inputInstance); + DoubleVector output = internalResults.get(internalResults.size() - 1); + + // get the training error + calculateTrainingError(labels, + output.deepCopy().sliceUnsafe(1, output.getDimension() - 1)); + + if (this.trainingMethod.equals(TrainingMethod.GRADIATE_DESCENT)) { + return this.trainByInstanceGradientDescent(labels, internalResults); + } + throw new IllegalArgumentException( + String.format("Training method is not supported.")); + } + + /** + * Train by gradient descent. Get the updated weights using one training + * instance. + * + * @param trainingInstance + * @return The weight update matrices. + */ + private DoubleMatrix[] trainByInstanceGradientDescent(DoubleVector labels, + List internalResults) { + + DoubleVector output = internalResults.get(internalResults.size() - 1); + // initialize weight update matrices + DenseDoubleMatrix[] weightUpdateMatrices = new DenseDoubleMatrix[this.weightMatrixList + .size()]; + for (int m = 0; m < weightUpdateMatrices.length; ++m) { + weightUpdateMatrices[m] = new DenseDoubleMatrix(this.weightMatrixList + .get(m).getRowCount(), this.weightMatrixList.get(m).getColumnCount()); + } + DoubleVector deltaVec = new DenseDoubleVector( + this.layerSizeList.get(this.layerSizeList.size() - 1)); + + // // calculate norm-2 error ||t - o||^2 + // DoubleVector errorVec = output.slice(output.getDimension() - + // 1).applyToElements(labels, new DoubleDoubleFunction() { + // @Override + // public double apply(double x1, double x2) { + // double v = x1 - x2; + // return v * v; + // } + // @Override + // public double applyDerivative(double x1, double x2) { + // throw new UnsupportedOperationException(); + // } + // }); + // double error = errorVec.sum(); + // System.out.printf("Error: %f\n", error); + + // System.out.printf("Output: %s\n", output); + + DoubleFunction squashingFunction = this.squashingFunctionList + .get(this.squashingFunctionList.size() - 1); + + DoubleMatrix lastWeightMatrix = this.weightMatrixList + .get(this.weightMatrixList.size() - 1); + for (int i = 0; i < deltaVec.getDimension(); ++i) { + double costFuncDerivative = this.costFunction.applyDerivative( + labels.get(i), output.get(i + 1)); + // add regularization + costFuncDerivative += this.regularizationWeight + * lastWeightMatrix.getRowVector(i).sum(); + deltaVec.set(i, costFuncDerivative); + deltaVec.set( + i, + deltaVec.get(i) + * squashingFunction.applyDerivative(output.get(i + 1))); + } + + // System.out.printf("Delta output: %s\n", deltaVec.toString()); + + // start from previous layer of output layer + for (int layer = this.layerSizeList.size() - 2; layer >= 0; --layer) { + output = internalResults.get(layer); + deltaVec = backpropagate(layer, deltaVec, internalResults, + weightUpdateMatrices[layer]); + } + + this.setPrevWeightMatrices(weightUpdateMatrices); + + return weightUpdateMatrices; + } + + /** + * Back-propagate the errors to from next layer to current layer. The weight + * updated information will be stored in the weightUpdateMatrices, and the + * delta of the prevLayer would be returned. + * + * @param layer Index of current layer. + * @param internalOutput Internal output of current layer. + * @param deltaVec Delta of next layer. + * @return + */ + private DoubleVector backpropagate(int curLayerIdx, + DoubleVector nextLayerDelta, List outputCache, + DenseDoubleMatrix weightUpdateMatrix) { + + // get layer related information + DoubleFunction squashingFunction = this.squashingFunctionList + .get(curLayerIdx); + DoubleVector curLayerOutput = outputCache.get(curLayerIdx); + DoubleMatrix weightMatrix = this.weightMatrixList.get(curLayerIdx); + DoubleMatrix prevWeightMatrix = this.prevWeightUpdatesList.get(curLayerIdx); + + // next layer is not output layer, remove the delta of bias neuron + if (curLayerIdx != this.layerSizeList.size() - 2) { + nextLayerDelta = nextLayerDelta.slice(1, + nextLayerDelta.getDimension() - 1); + } + + DoubleVector delta = weightMatrix.transpose() + .multiplyVector(nextLayerDelta); + for (int i = 0; i < delta.getDimension(); ++i) { + delta.set( + i, + delta.get(i) + * squashingFunction.applyDerivative(curLayerOutput.get(i))); + } + + // System.out.printf("Delta layer: %d, %s\n", curLayerIdx, + // delta.toString()); + + // update weights + for (int i = 0; i < weightUpdateMatrix.getRowCount(); ++i) { + for (int j = 0; j < weightUpdateMatrix.getColumnCount(); ++j) { + weightUpdateMatrix.set(i, j, + -learningRate * nextLayerDelta.get(i) * curLayerOutput.get(j) + + this.momentumWeight * prevWeightMatrix.get(i, j)); + } + } + + // System.out.printf("Weight Layer %d, %s\n", curLayerIdx, + // weightUpdateMatrix.toString()); + + return delta; + } + + @Override + protected void trainInternal(Path dataInputPath, + Map trainingParams) throws IOException, + InterruptedException, ClassNotFoundException { + // add all training parameters to configuration + Configuration conf = new Configuration(); + for (Map.Entry entry : trainingParams.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + + // if training parameters contains the model path, update the model path + String modelPath = trainingParams.get("modelPath"); + if (modelPath != null) { + this.modelPath = modelPath; + } + // modelPath must be set before training + if (this.modelPath == null) { + throw new IllegalArgumentException( + "Please specify the modelPath for model, " + + "either through setModelPath() or add 'modelPath' to the training parameters."); + } + + conf.set("modelPath", this.modelPath); + this.writeModelToFile(); + + HamaConfiguration hamaConf = new HamaConfiguration(conf); + + // create job + BSPJob job = new BSPJob(hamaConf, SmallLayeredNeuralNetworkTrainer.class); + job.setJobName("Small scale Neural Network training"); + job.setJarByClass(SmallLayeredNeuralNetworkTrainer.class); + job.setBspClass(SmallLayeredNeuralNetworkTrainer.class); + job.setInputPath(dataInputPath); + job.setInputFormat(org.apache.hama.bsp.SequenceFileInputFormat.class); + job.setInputKeyClass(LongWritable.class); + job.setInputValueClass(VectorWritable.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(NullWritable.class); + job.setOutputFormat(org.apache.hama.bsp.NullOutputFormat.class); + + int numTasks = conf.getInt("tasks", 1); + job.setNumBspTask(numTasks); + job.waitForCompletion(true); + + // reload learned model + Log.info(String.format("Reload model from %s.", this.modelPath)); + this.readFromModel(); + + } + + @Override + protected void calculateTrainingError(DoubleVector labels, DoubleVector output) { + DoubleVector errors = labels.deepCopy().applyToElements(output, + this.costFunction); + // System.out.printf("Labels: %s\tOutput: %s\n", labels, output); + this.trainingError = errors.sum(); + // System.out.printf("Training error: %s\n", errors); + } + +} Index: ml/src/main/java/org/apache/hama/ml/ann/SmallLayeredNeuralNetworkMessage.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/ann/SmallLayeredNeuralNetworkMessage.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/ann/SmallLayeredNeuralNetworkMessage.java (working copy) @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.ann; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hama.ml.math.DenseDoubleMatrix; +import org.apache.hama.ml.math.DoubleMatrix; +import org.apache.hama.ml.writable.MatrixWritable; + +/** + * NeuralNetworkMessage transmits the messages between peers during the training + * of neural networks. + * + */ +public class SmallLayeredNeuralNetworkMessage implements Writable { + + protected double trainingError; + protected DoubleMatrix[] curMatrices; + protected DoubleMatrix[] prevMatrices; + protected boolean converge; + + public SmallLayeredNeuralNetworkMessage(double trainingError, + boolean converge, DoubleMatrix[] weightMatrices, + DoubleMatrix[] prevMatrices) { + this.trainingError = trainingError; + this.converge = converge; + this.curMatrices = weightMatrices; + this.prevMatrices = prevMatrices; + } + + @Override + public void readFields(DataInput input) throws IOException { + trainingError = input.readDouble(); + converge = input.readBoolean(); + int numMatrices = input.readInt(); + boolean hasPrevMatrices = input.readBoolean(); + curMatrices = new DenseDoubleMatrix[numMatrices]; + // read matrice updates + for (int i = 0; i < curMatrices.length; ++i) { + curMatrices[i] = (DenseDoubleMatrix) MatrixWritable.read(input); + } + + if (hasPrevMatrices) { + prevMatrices = new DenseDoubleMatrix[numMatrices]; + // read previous matrices updates + for (int i = 0; i < prevMatrices.length; ++i) { + prevMatrices[i] = (DenseDoubleMatrix) MatrixWritable.read(input); + } + } + } + + @Override + public void write(DataOutput output) throws IOException { + output.writeDouble(trainingError); + output.writeBoolean(converge); + output.writeInt(curMatrices.length); + if (prevMatrices == null) { + output.writeBoolean(false); + } else { + output.writeBoolean(true); + } + for (int i = 0; i < curMatrices.length; ++i) { + MatrixWritable.write(curMatrices[i], output); + } + if (prevMatrices != null) { + for (int i = 0; i < prevMatrices.length; ++i) { + MatrixWritable.write(prevMatrices[i], output); + } + } + } + + public double getTrainingError() { + return trainingError; + } + + public void setTrainingError(double trainingError) { + this.trainingError = trainingError; + } + + public boolean isConverge() { + return converge; + } + + public void setConverge(boolean converge) { + this.converge = converge; + } + + public DoubleMatrix[] getCurMatrices() { + return curMatrices; + } + + public void setMatrices(DoubleMatrix[] curMatrices) { + this.curMatrices = curMatrices; + } + + public DoubleMatrix[] getPrevMatrices() { + return prevMatrices; + } + + public void setPrevMatrices(DoubleMatrix[] prevMatrices) { + this.prevMatrices = prevMatrices; + } + +} Index: ml/src/main/java/org/apache/hama/ml/ann/SmallLayeredNeuralNetworkTrainer.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/ann/SmallLayeredNeuralNetworkTrainer.java (revision 0) +++ ml/src/main/java/org/apache/hama/ml/ann/SmallLayeredNeuralNetworkTrainer.java (working copy) @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.ann; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hama.bsp.BSP; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.sync.SyncException; +import org.apache.hama.ml.math.DenseDoubleMatrix; +import org.apache.hama.ml.math.DoubleMatrix; +import org.apache.hama.ml.math.DoubleVector; +import org.apache.hama.ml.writable.VectorWritable; +import org.mortbay.log.Log; + +/** + * The trainer that train the {@link SmallLayeredNeuralNetwork} based on BSP + * framework. + * + */ +public final class SmallLayeredNeuralNetworkTrainer + extends + BSP { + + private SmallLayeredNeuralNetwork inMemoryModel; + private Configuration conf; + /* Default batch size */ + private int batchSize; + + /* check the interval between intervals */ + private double prevAvgTrainingError; + private double curAvgTrainingError; + private long convergenceCheckInterval; + private long iterations; + private long maxIterations; + private boolean isConverge; + + private String modelPath; + + @Override + /** + * If the model path is specified, load the existing from storage location. + */ + public void setup( + BSPPeer peer) { + if (peer.getPeerIndex() == 0) { + Log.info("Begin to train"); + } + this.isConverge = false; + this.conf = peer.getConfiguration(); + this.iterations = 0; + this.modelPath = conf.get("modelPath"); + this.maxIterations = conf.getLong("training.max.iterations", 100000); + this.convergenceCheckInterval = conf.getLong("convergence.check.interval", + 1000); + this.modelPath = conf.get("modelPath"); + this.inMemoryModel = new SmallLayeredNeuralNetwork(modelPath); + this.prevAvgTrainingError = Integer.MAX_VALUE; + this.batchSize = conf.getInt("training.batch.size", 50); + } + + @Override + /** + * Write the trained model back to stored location. + */ + public void cleanup( + BSPPeer peer) { + // write model to modelPath + if (peer.getPeerIndex() == 0) { + try { + Log.info(String.format("End of training, number of iterations: %d.\n", + this.iterations)); + Log.info(String.format("Write model back to %s\n", + inMemoryModel.getModelPath())); + this.inMemoryModel.writeModelToFile(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + @Override + public void bsp( + BSPPeer peer) + throws IOException, SyncException, InterruptedException { + while (this.iterations++ < maxIterations) { + // each groom calculate the matrices updates according to local data + calculateUpdates(peer); + peer.sync(); + + // master merge the updates model + if (peer.getPeerIndex() == 0) { + mergeUpdates(peer); + } + peer.sync(); + if (this.isConverge) { + break; + } + } + } + + /** + * Calculate the matrices updates according to local partition of data. + * + * @param peer + * @throws IOException + */ + private void calculateUpdates( + BSPPeer peer) + throws IOException { + // receive update information from master + if (peer.getNumCurrentMessages() != 0) { + SmallLayeredNeuralNetworkMessage inMessage = peer.getCurrentMessage(); + DoubleMatrix[] newWeights = inMessage.getCurMatrices(); + DoubleMatrix[] preWeightUpdates = inMessage.getPrevMatrices(); + this.inMemoryModel.setWeightMatrices(newWeights); + this.inMemoryModel.setPrevWeightMatrices(preWeightUpdates); + this.isConverge = inMessage.isConverge(); + // check converge + if (isConverge) { + return; + } + } + + DoubleMatrix[] weightUpdates = new DoubleMatrix[this.inMemoryModel.weightMatrixList + .size()]; + for (int i = 0; i < weightUpdates.length; ++i) { + int row = this.inMemoryModel.weightMatrixList.get(i).getRowCount(); + int col = this.inMemoryModel.weightMatrixList.get(i).getColumnCount(); + weightUpdates[i] = new DenseDoubleMatrix(row, col); + } + + // continue to train + double avgTrainingError = 0.0; + LongWritable key = new LongWritable(); + VectorWritable value = new VectorWritable(); + for (int recordsRead = 0; recordsRead < batchSize; ++recordsRead) { + if (peer.readNext(key, value) == false) { + peer.reopenInput(); + peer.readNext(key, value); + } + DoubleVector trainingInstance = value.getVector(); + SmallLayeredNeuralNetwork.matricesAdd(weightUpdates, + this.inMemoryModel.trainByInstance(trainingInstance)); + avgTrainingError += this.inMemoryModel.trainingError; + } + avgTrainingError /= batchSize; + + // calculate the average of updates + for (int i = 0; i < weightUpdates.length; ++i) { + weightUpdates[i] = weightUpdates[i].divide(batchSize); + } + + DoubleMatrix[] prevWeightUpdates = this.inMemoryModel + .getPrevMatricesUpdates(); + SmallLayeredNeuralNetworkMessage outMessage = new SmallLayeredNeuralNetworkMessage( + avgTrainingError, false, weightUpdates, prevWeightUpdates); + peer.send(peer.getPeerName(0), outMessage); + } + + /** + * Merge the updates according to the updates of the grooms. + * + * @param peer + * @throws IOException + */ + private void mergeUpdates( + BSPPeer peer) + throws IOException { + int numMessages = peer.getNumCurrentMessages(); + boolean isConverge = false; + if (numMessages == 0) { // converges + isConverge = true; + return; + } + + double avgTrainingError = 0; + DoubleMatrix[] matricesUpdates = null; + DoubleMatrix[] prevMatricesUpdates = null; + + while (peer.getNumCurrentMessages() > 0) { + SmallLayeredNeuralNetworkMessage message = peer.getCurrentMessage(); + if (matricesUpdates == null) { + matricesUpdates = message.getCurMatrices(); + prevMatricesUpdates = message.getPrevMatrices(); + } else { + SmallLayeredNeuralNetwork.matricesAdd(matricesUpdates, + message.getCurMatrices()); + SmallLayeredNeuralNetwork.matricesAdd(prevMatricesUpdates, + message.getPrevMatrices()); + } + avgTrainingError += message.getTrainingError(); + } + + if (numMessages != 1) { + avgTrainingError /= numMessages; + for (int i = 0; i < matricesUpdates.length; ++i) { + matricesUpdates[i] = matricesUpdates[i].divide(numMessages); + prevMatricesUpdates[i] = prevMatricesUpdates[i].divide(numMessages); + } + } + this.inMemoryModel.updateWeightMatrices(matricesUpdates); + this.inMemoryModel.setPrevWeightMatrices(prevMatricesUpdates); + + // check convergence + if (iterations % convergenceCheckInterval == 0) { + if (prevAvgTrainingError < curAvgTrainingError) { + // error cannot decrease any more + isConverge = true; + } + // update + prevAvgTrainingError = curAvgTrainingError; + curAvgTrainingError = 0; + } + curAvgTrainingError += avgTrainingError / convergenceCheckInterval; + + // broadcast updated weight matrices + for (String peerName : peer.getAllPeerNames()) { + SmallLayeredNeuralNetworkMessage msg = new SmallLayeredNeuralNetworkMessage( + 0, isConverge, this.inMemoryModel.getWeightMatrices(), + this.inMemoryModel.getPrevMatricesUpdates()); + peer.send(peerName, msg); + } + } + +} Index: ml/src/main/java/org/apache/hama/ml/math/CrossEntropy.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/math/CrossEntropy.java (revision 1512864) +++ ml/src/main/java/org/apache/hama/ml/math/CrossEntropy.java (working copy) @@ -29,7 +29,12 @@ @Override public double apply(double target, double actual) { - return -target * Math.log(actual) - (1 - target) * Math.log(1 - actual); + double adjustedTarget = (target == 0 ? 0.000001 : target); + adjustedTarget = (target == 1.0 ? 0.999999 : target); + double adjustedActual = (actual == 0 ? 0.000001 : actual); + adjustedActual = (actual == 1 ? 0.999999 : actual); + return -adjustedTarget * Math.log(adjustedActual) - (1 - adjustedTarget) + * Math.log(1 - adjustedActual); } @Override Index: ml/src/main/java/org/apache/hama/ml/math/IdentityFunction.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/math/IdentityFunction.java (revision 1512864) +++ ml/src/main/java/org/apache/hama/ml/math/IdentityFunction.java (working copy) @@ -30,7 +30,7 @@ @Override public double applyDerivative(double value) { - return 0; + return 1; } } Index: ml/src/main/java/org/apache/hama/ml/math/SquaredError.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/math/SquaredError.java (revision 1512864) +++ ml/src/main/java/org/apache/hama/ml/math/SquaredError.java (working copy) @@ -40,7 +40,6 @@ * {@inheritDoc} */ public double applyDerivative(double target, double actual) { - // return target - actual; return actual - target; } Index: ml/src/main/java/org/apache/hama/ml/perception/MultiLayerPerceptron.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/perception/MultiLayerPerceptron.java (revision 1512864) +++ ml/src/main/java/org/apache/hama/ml/perception/MultiLayerPerceptron.java (working copy) @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.hadoop.fs.Path; +import org.apache.hama.ml.ann.NeuralNetworkTrainer; import org.apache.hama.ml.math.DoubleDoubleFunction; import org.apache.hama.ml.math.DoubleFunction; import org.apache.hama.ml.math.DoubleVector; @@ -32,7 +33,7 @@ public abstract class MultiLayerPerceptron { /* The trainer for the model */ - protected PerceptronTrainer trainer; + protected NeuralNetworkTrainer trainer; /* The file path that contains the model meta-data */ protected String modelPath; @@ -55,8 +56,8 @@ * @param learningRate Larger learningRate makes MLP learn more aggressive. * Learning rate cannot be negative. * @param regularization Regularization makes MLP less likely to overfit. The - * value of regularization cannot be negative or too large, - * otherwise it will affect the precision. + * value of regularization cannot be negative or too large, otherwise + * it will affect the precision. * @param momentum The momentum makes the historical adjust have affect to * current adjust. The weight of momentum cannot be negative. * @param squashingFunctionName The name of squashing function. Index: ml/src/main/java/org/apache/hama/ml/perception/SmallMLPTrainer.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/perception/SmallMLPTrainer.java (revision 1512864) +++ ml/src/main/java/org/apache/hama/ml/perception/SmallMLPTrainer.java (working copy) @@ -27,15 +27,15 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hama.bsp.BSPPeer; import org.apache.hama.bsp.sync.SyncException; +import org.apache.hama.ml.ann.NeuralNetworkTrainer; import org.apache.hama.ml.math.DenseDoubleMatrix; import org.apache.hama.ml.writable.VectorWritable; /** * The perceptron trainer for small scale MLP. */ -class SmallMLPTrainer extends PerceptronTrainer { +class SmallMLPTrainer extends NeuralNetworkTrainer { - private static final Log LOG = LogFactory.getLog(SmallMLPTrainer.class); /* used by master only, check whether all slaves finishes reading */ private BitSet statusSet; @@ -51,6 +51,11 @@ protected void extraSetup( BSPPeer peer) { + // obtain parameters + this.trainingMode = conf.get("training.mode", "minibatch.gradient.descent"); + // mini-batch by default + this.batchSize = conf.getInt("training.batch.size", 100); + this.statusSet = new BitSet(peer.getConfiguration().getInt("tasks", 1)); String outputModelPath = conf.get("modelPath"); Index: ml/src/main/java/org/apache/hama/ml/perception/SmallMultiLayerPerceptron.java =================================================================== --- ml/src/main/java/org/apache/hama/ml/perception/SmallMultiLayerPerceptron.java (revision 1512864) +++ ml/src/main/java/org/apache/hama/ml/perception/SmallMultiLayerPerceptron.java (working copy) @@ -108,11 +108,11 @@ // add weights for bias this.weightMatrice[i] = new DenseDoubleMatrix(this.layerSizeArray[i] + 1, this.layerSizeArray[i + 1]); - + this.weightMatrice[i].applyToElements(new DoubleFunction() { private Random rnd = new Random(); - + @Override public double apply(double value) { return rnd.nextDouble() - 0.5; @@ -122,16 +122,16 @@ public double applyDerivative(double value) { throw new UnsupportedOperationException("Not supported"); } - + }); - -// int rowCount = this.weightMatrice[i].getRowCount(); -// int colCount = this.weightMatrice[i].getColumnCount(); -// for (int row = 0; row < rowCount; ++row) { -// for (int col = 0; col < colCount; ++col) { -// this.weightMatrice[i].set(row, col, rnd.nextDouble() - 0.5); -// } -// } + + // int rowCount = this.weightMatrice[i].getRowCount(); + // int colCount = this.weightMatrice[i].getColumnCount(); + // for (int row = 0; row < rowCount; ++row) { + // for (int col = 0; col < colCount; ++col) { + // this.weightMatrice[i].set(row, col, rnd.nextDouble() - 0.5); + // } + // } } } @@ -217,7 +217,8 @@ prevNeuronIdx, neuronIdx) * intermediateResult[prevNeuronIdx]; } // calculate via squashing function - results[neuronIdx + offset] = this.squashingFunction.apply(results[neuronIdx + offset]); + results[neuronIdx + offset] = this.squashingFunction + .apply(results[neuronIdx + offset]); } return results; @@ -273,8 +274,7 @@ delta[j] += this.regularization * derivativeRegularization; } - delta[j] *= this.squashingFunction - .applyDerivative(outputLayerOutput[j]); + delta[j] *= this.squashingFunction.applyDerivative(outputLayerOutput[j]); // calculate the weight update matrix between the last hidden layer and // the output layer @@ -323,8 +323,7 @@ double weight = this.weightMatrice[curLayerIdx].get(j, k); delta[j] += weight * nextLayerDelta[k]; } - delta[j] *= this.squashingFunction - .applyDerivative(curLayerOutput[j + 1]); + delta[j] *= this.squashingFunction.applyDerivative(curLayerOutput[j + 1]); // calculate the weight update matrix between the previous layer and the // current layer @@ -332,7 +331,7 @@ double updatedValue = -this.learningRate * delta[j] * prevLayerOutput[i]; // add momemtum - updatedValue += this.momentum * prevWeightUpdateMatrix.get(i, j); + // updatedValue += this.momentum * prevWeightUpdateMatrix.get(i, j); weightUpdateMatrices[prevLayerIdx].set(i, j, updatedValue); } } Index: ml/src/test/java/org/apache/hama/ml/ann/TestSmallLayeredNeuralNetwork.java =================================================================== --- ml/src/test/java/org/apache/hama/ml/ann/TestSmallLayeredNeuralNetwork.java (revision 0) +++ ml/src/test/java/org/apache/hama/ml/ann/TestSmallLayeredNeuralNetwork.java (working copy) @@ -0,0 +1,542 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.ann; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hama.ml.ann.AbstractLayeredNeuralNetwork.TrainingMethod; +import org.apache.hama.ml.math.DenseDoubleMatrix; +import org.apache.hama.ml.math.DenseDoubleVector; +import org.apache.hama.ml.math.DoubleMatrix; +import org.apache.hama.ml.math.DoubleVector; +import org.apache.hama.ml.math.FunctionFactory; +import org.apache.hama.ml.writable.VectorWritable; +import org.junit.Test; +import org.mortbay.log.Log; + +/** + * Test the functionality of SmallLayeredNeuralNetwork. + * + */ +public class TestSmallLayeredNeuralNetwork { + + @Test + public void testReadWrite() { + SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork(); + ann.addLayer(2, false, + FunctionFactory.createDoubleFunction("IdentityFunction")); + ann.addLayer(5, false, + FunctionFactory.createDoubleFunction("IdentityFunction")); + ann.addLayer(1, true, + FunctionFactory.createDoubleFunction("IdentityFunction")); + ann.setCostFunction(FunctionFactory + .createDoubleDoubleFunction("SquaredError")); + double learningRate = 0.2; + ann.setLearningRate(learningRate); + double momentumWeight = 0.5; + ann.setMomemtumWeight(momentumWeight); + double regularizationWeight = 0.05; + ann.setRegularizationWeight(regularizationWeight); + // intentionally initialize all weights to 0.5 + DoubleMatrix[] matrices = new DenseDoubleMatrix[2]; + matrices[0] = new DenseDoubleMatrix(5, 3, 0.2); + matrices[1] = new DenseDoubleMatrix(1, 6, 0.8); + ann.setWeightMatrices(matrices); + + // write to file + String modelPath = "/tmp/testSmallLayeredNeuralNetworkReadWrite"; + ann.setModelPath(modelPath); + try { + ann.writeModelToFile(); + } catch (IOException e) { + e.printStackTrace(); + } + + // read from file + SmallLayeredNeuralNetwork annCopy = new SmallLayeredNeuralNetwork(modelPath); + assertEquals(annCopy.getClass().getSimpleName(), annCopy.getModelType()); + assertEquals(modelPath, annCopy.getModelPath()); + assertEquals(learningRate, annCopy.getLearningRate(), 0.000001); + assertEquals(momentumWeight, annCopy.getMomemtumWeight(), 0.000001); + assertEquals(regularizationWeight, annCopy.getRegularizationWeight(), + 0.000001); + assertEquals(TrainingMethod.GRADIATE_DESCENT, annCopy.getTrainingMethod()); + + // compare weights + DoubleMatrix[] weightsMatrices = annCopy.getWeightMatrices(); + for (int i = 0; i < weightsMatrices.length; ++i) { + DoubleMatrix expectMat = matrices[i]; + DoubleMatrix actualMat = weightsMatrices[i]; + for (int j = 0; j < expectMat.getRowCount(); ++j) { + for (int k = 0; k < expectMat.getColumnCount(); ++k) { + assertEquals(expectMat.get(j, k), actualMat.get(j, k), 0.000001); + } + } + } + } + + @Test + /** + * Test the forward functionality. + */ + public void testOutput() { + // first network + SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork(); + ann.addLayer(2, false, + FunctionFactory.createDoubleFunction("IdentityFunction")); + ann.addLayer(5, false, + FunctionFactory.createDoubleFunction("IdentityFunction")); + ann.addLayer(1, true, + FunctionFactory.createDoubleFunction("IdentityFunction")); + ann.setCostFunction(FunctionFactory + .createDoubleDoubleFunction("SquaredError")); + ann.setLearningRate(0.1); + // intentionally initialize all weights to 0.5 + DoubleMatrix[] matrices = new DenseDoubleMatrix[2]; + matrices[0] = new DenseDoubleMatrix(5, 3, 0.5); + matrices[1] = new DenseDoubleMatrix(1, 6, 0.5); + ann.setWeightMatrices(matrices); + + double[] arr = new double[] { 0, 1 }; + DoubleVector training = new DenseDoubleVector(arr); + DoubleVector result = ann.getOutput(training); + assertEquals(1, result.getDimension()); + // assertEquals(3, result.get(0), 0.000001); + + // second network + SmallLayeredNeuralNetwork ann2 = new SmallLayeredNeuralNetwork(); + ann2.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid")); + ann2.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid")); + ann2.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid")); + ann2.setCostFunction(FunctionFactory + .createDoubleDoubleFunction("SquaredError")); + ann2.setLearningRate(0.3); + // intentionally initialize all weights to 0.5 + DoubleMatrix[] matrices2 = new DenseDoubleMatrix[2]; + matrices2[0] = new DenseDoubleMatrix(3, 3, 0.5); + matrices2[1] = new DenseDoubleMatrix(1, 4, 0.5); + ann2.setWeightMatrices(matrices2); + + double[] test = { 0, 0 }; + double[] result2 = { 0.807476 }; + + DoubleVector vec = ann2.getOutput(new DenseDoubleVector(test)); + assertArrayEquals(result2, vec.toArray(), 0.000001); + + SmallLayeredNeuralNetwork ann3 = new SmallLayeredNeuralNetwork(); + ann3.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid")); + ann3.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid")); + ann3.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid")); + ann3.setCostFunction(FunctionFactory + .createDoubleDoubleFunction("SquaredError")); + ann3.setLearningRate(0.3); + // intentionally initialize all weights to 0.5 + DoubleMatrix[] initMatrices = new DenseDoubleMatrix[2]; + initMatrices[0] = new DenseDoubleMatrix(3, 3, 0.5); + initMatrices[1] = new DenseDoubleMatrix(1, 4, 0.5); + ann3.setWeightMatrices(initMatrices); + + double[] instance = { 0, 1 }; + DoubleVector output = ann3.getOutput(new DenseDoubleVector(instance)); + assertEquals(0.8315410, output.get(0), 0.000001); + } + + @Test + public void testXORlocal() { + SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork(); + ann.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid")); + ann.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid")); + ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid")); + ann.setCostFunction(FunctionFactory + .createDoubleDoubleFunction("SquaredError")); + ann.setLearningRate(0.5); + ann.setMomemtumWeight(0.0); + + int iterations = 50000; // iteration should be set to a very large number + double[][] instances = { { 0, 1, 1 }, { 0, 0, 0 }, { 1, 0, 1 }, { 1, 1, 0 } }; + for (int i = 0; i < iterations; ++i) { + DoubleMatrix[] matrices = null; + for (int j = 0; j < instances.length; ++j) { + matrices = ann.trainByInstance(new DenseDoubleVector(instances[j + % instances.length])); + ann.updateWeightMatrices(matrices); + } + } + + for (int i = 0; i < instances.length; ++i) { + DoubleVector input = new DenseDoubleVector(instances[i]).slice(2); + // the expected output is the last element in array + double result = instances[i][2]; + assertEquals(result, ann.getOutput(input).get(0), 0.1); + } + + // write model into file and read out + String modelPath = "/tmp/testSmallLayeredNeuralNetworkXORLocal"; + ann.setModelPath(modelPath); + try { + ann.writeModelToFile(); + } catch (IOException e) { + e.printStackTrace(); + } + SmallLayeredNeuralNetwork annCopy = new SmallLayeredNeuralNetwork(modelPath); + // test on instances + for (int i = 0; i < instances.length; ++i) { + DoubleVector input = new DenseDoubleVector(instances[i]).slice(2); + // the expected output is the last element in array + double result = instances[i][2]; + assertEquals(result, annCopy.getOutput(input).get(0), 0.1); + } + } + + @Test + public void testXORWithMomentum() { + SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork(); + ann.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid")); + ann.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid")); + ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid")); + ann.setCostFunction(FunctionFactory + .createDoubleDoubleFunction("SquaredError")); + ann.setLearningRate(0.6); + ann.setMomemtumWeight(0.3); + + int iterations = 2000; // iteration should be set to a very large number + double[][] instances = { { 0, 1, 1 }, { 0, 0, 0 }, { 1, 0, 1 }, { 1, 1, 0 } }; + for (int i = 0; i < iterations; ++i) { + for (int j = 0; j < instances.length; ++j) { + ann.trainOnline(new DenseDoubleVector(instances[j % instances.length])); + } + } + + for (int i = 0; i < instances.length; ++i) { + DoubleVector input = new DenseDoubleVector(instances[i]).slice(2); + // the expected output is the last element in array + double result = instances[i][2]; + assertEquals(result, ann.getOutput(input).get(0), 0.1); + } + + // write model into file and read out + String modelPath = "/tmp/testSmallLayeredNeuralNetworkXORLocalWithMomentum"; + ann.setModelPath(modelPath); + try { + ann.writeModelToFile(); + } catch (IOException e) { + e.printStackTrace(); + } + SmallLayeredNeuralNetwork annCopy = new SmallLayeredNeuralNetwork(modelPath); + // test on instances + for (int i = 0; i < instances.length; ++i) { + DoubleVector input = new DenseDoubleVector(instances[i]).slice(2); + // the expected output is the last element in array + double result = instances[i][2]; + assertEquals(result, annCopy.getOutput(input).get(0), 0.1); + } + } + + @Test + public void testXORLocalWithRegularization() { + SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork(); + ann.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid")); + ann.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid")); + ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid")); + ann.setCostFunction(FunctionFactory + .createDoubleDoubleFunction("SquaredError")); + ann.setLearningRate(0.7); + ann.setMomemtumWeight(0.5); + ann.setRegularizationWeight(0.002); + + int iterations = 5000; // iteration should be set to a very large number + double[][] instances = { { 0, 1, 1 }, { 0, 0, 0 }, { 1, 0, 1 }, { 1, 1, 0 } }; + for (int i = 0; i < iterations; ++i) { + for (int j = 0; j < instances.length; ++j) { + ann.trainOnline(new DenseDoubleVector(instances[j % instances.length])); + } + } + + for (int i = 0; i < instances.length; ++i) { + DoubleVector input = new DenseDoubleVector(instances[i]).slice(2); + // the expected output is the last element in array + double result = instances[i][2]; + assertEquals(result, ann.getOutput(input).get(0), 0.05); + } + + // write model into file and read out + String modelPath = "/tmp/testSmallLayeredNeuralNetworkXORLocalWithRegularization"; + ann.setModelPath(modelPath); + try { + ann.writeModelToFile(); + } catch (IOException e) { + e.printStackTrace(); + } + SmallLayeredNeuralNetwork annCopy = new SmallLayeredNeuralNetwork(modelPath); + // test on instances + for (int i = 0; i < instances.length; ++i) { + DoubleVector input = new DenseDoubleVector(instances[i]).slice(2); + // the expected output is the last element in array + double result = instances[i][2]; + assertEquals(result, annCopy.getOutput(input).get(0), 0.05); + } + } + + @Test + public void testTwoClassClassification() { + // use logistic regression data + String filepath = "src/test/resources/logistic_regression_data.txt"; + List instanceList = new ArrayList(); + + try { + BufferedReader br = new BufferedReader(new FileReader(filepath)); + String line = null; + while ((line = br.readLine()) != null) { + String[] tokens = line.trim().split(","); + double[] instance = new double[tokens.length]; + for (int i = 0; i < tokens.length; ++i) { + instance[i] = Double.parseDouble(tokens[i]); + } + instanceList.add(instance); + } + br.close(); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + + int dimension = instanceList.get(0).length - 1; + + // min-max normalization + double[] mins = new double[dimension]; + double[] maxs = new double[dimension]; + Arrays.fill(mins, Double.MAX_VALUE); + Arrays.fill(maxs, Double.MIN_VALUE); + + for (double[] instance : instanceList) { + for (int i = 0; i < instance.length - 1; ++i) { + if (mins[i] > instance[i]) { + mins[i] = instance[i]; + } + if (maxs[i] < instance[i]) { + maxs[i] = instance[i]; + } + } + } + + for (double[] instance : instanceList) { + for (int i = 0; i < instance.length - 1; ++i) { + double range = maxs[i] - mins[i]; + if (range != 0) { + instance[i] = (instance[i] - mins[i]) / range; + } + } + } + + // divide dataset into training and testing + List testInstances = new ArrayList(); + testInstances.addAll(instanceList.subList(instanceList.size() - 100, + instanceList.size())); + List trainingInstances = instanceList.subList(0, + instanceList.size() - 100); + + SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork(); + ann.setLearningRate(0.001); + ann.setMomemtumWeight(0.1); + ann.setRegularizationWeight(0.01); + ann.addLayer(dimension, false, + FunctionFactory.createDoubleFunction("Sigmoid")); + ann.addLayer(dimension, false, + FunctionFactory.createDoubleFunction("Sigmoid")); + ann.addLayer(dimension, false, + FunctionFactory.createDoubleFunction("Sigmoid")); + ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid")); + ann.setCostFunction(FunctionFactory + .createDoubleDoubleFunction("CrossEntropy")); + + long start = new Date().getTime(); + int iterations = 1000; + for (int i = 0; i < iterations; ++i) { + for (double[] trainingInstance : trainingInstances) { + ann.trainOnline(new DenseDoubleVector(trainingInstance)); + } + } + long end = new Date().getTime(); + Log.info(String.format("Training time: %fs\n", + (double) (end - start) / 1000)); + + double errorRate = 0; + // calculate the error on test instance + for (double[] testInstance : testInstances) { + DoubleVector instance = new DenseDoubleVector(testInstance); + double expected = instance.get(instance.getDimension() - 1); + instance = instance.slice(instance.getDimension() - 1); + double actual = ann.getOutput(instance).get(0); + if (actual < 0.5 && expected >= 0.5 || actual >= 0.5 && expected < 0.5) { + ++errorRate; + } + } + errorRate /= testInstances.size(); + + Log.info(String.format("Relative error: %f%%\n", errorRate * 100)); + } + + @Test + public void testDistributedVersion() { + // write data into a sequence file + String tmpStrDatasetPath = "/tmp/logistic_regression_data"; + Path tmpDatasetPath = new Path(tmpStrDatasetPath); + String strDataPath = "src/test/resources/logistic_regression_data.txt"; + String modelPath = "/tmp/distributed-model"; + + Configuration conf = new Configuration(); + List instanceList = new ArrayList(); + List trainingInstances = null; + List testInstances = null; + + try { + FileSystem fs = FileSystem.get(new URI(tmpStrDatasetPath), conf); + fs.delete(tmpDatasetPath, true); + if (fs.exists(tmpDatasetPath)) { + fs.createNewFile(tmpDatasetPath); + } + + BufferedReader br = new BufferedReader(new FileReader(strDataPath)); + String line = null; + int count = 0; + while ((line = br.readLine()) != null) { + String[] tokens = line.trim().split(","); + double[] instance = new double[tokens.length]; + for (int i = 0; i < tokens.length; ++i) { + instance[i] = Double.parseDouble(tokens[i]); + } + instanceList.add(instance); + } + br.close(); + + int dimension = instanceList.get(0).length - 1; + // min-max normalization + double[] mins = new double[dimension]; + double[] maxs = new double[dimension]; + Arrays.fill(mins, Double.MAX_VALUE); + Arrays.fill(maxs, Double.MIN_VALUE); + + for (double[] instance : instanceList) { + for (int i = 0; i < instance.length - 1; ++i) { + mins[i] = Math.min(mins[i], instance[i]); + maxs[i] = Math.max(maxs[i], instance[i]); + } + } + + for (double[] instance : instanceList) { + for (int i = 0; i < instance.length - 1; ++i) { + double range = maxs[i] - mins[i]; + if (range != 0) { + instance[i] = (instance[i] - mins[i]) / range; + } + } + } + + // write training data to temporal sequence file + SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, + tmpDatasetPath, LongWritable.class, VectorWritable.class); + int testSize = 150; + + Collections.shuffle(instanceList); + testInstances = new ArrayList(); + testInstances.addAll(instanceList.subList(instanceList.size() - testSize, + instanceList.size())); + trainingInstances = instanceList.subList(0, instanceList.size() + - testSize); + + for (double[] instance : trainingInstances) { + DoubleVector vec = new DenseDoubleVector(instance); + writer.append(new LongWritable(count++), new VectorWritable(vec)); + } + writer.close(); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } catch (URISyntaxException e) { + e.printStackTrace(); + } + + // create model + int dimension = 8; + SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork(); + ann.setLearningRate(0.7); + ann.setMomemtumWeight(0.5); + ann.setRegularizationWeight(0.1); + ann.addLayer(dimension, false, + FunctionFactory.createDoubleFunction("Sigmoid")); + ann.addLayer(dimension, false, + FunctionFactory.createDoubleFunction("Sigmoid")); + ann.addLayer(dimension, false, + FunctionFactory.createDoubleFunction("Sigmoid")); + ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid")); + ann.setCostFunction(FunctionFactory + .createDoubleDoubleFunction("CrossEntropy")); + ann.setModelPath(modelPath); + + long start = new Date().getTime(); + Map trainingParameters = new HashMap(); + trainingParameters.put("tasks", "5"); + trainingParameters.put("training.max.iterations", "2000"); + trainingParameters.put("training.batch.size", "300"); + trainingParameters.put("convergence.check.interval", "1000"); + ann.train(tmpDatasetPath, trainingParameters); + + long end = new Date().getTime(); + + // validate results + double errorRate = 0; + // calculate the error on test instance + for (double[] testInstance : testInstances) { + DoubleVector instance = new DenseDoubleVector(testInstance); + double expected = instance.get(instance.getDimension() - 1); + instance = instance.slice(instance.getDimension() - 1); + double actual = ann.getOutput(instance).get(0); + if (actual < 0.5 && expected >= 0.5 || actual >= 0.5 && expected < 0.5) { + ++errorRate; + } + } + errorRate /= testInstances.size(); + + Log.info(String.format("Training time: %fs\n", + (double) (end - start) / 1000)); + Log.info(String.format("Relative error: %f%%\n", errorRate * 100)); + } + +} Index: ml/src/test/java/org/apache/hama/ml/ann/TestSmallLayeredNeuralNetworkMessage.java =================================================================== --- ml/src/test/java/org/apache/hama/ml/ann/TestSmallLayeredNeuralNetworkMessage.java (revision 0) +++ ml/src/test/java/org/apache/hama/ml/ann/TestSmallLayeredNeuralNetworkMessage.java (working copy) @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.ann; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hama.ml.math.DenseDoubleMatrix; +import org.apache.hama.ml.math.DoubleMatrix; +import org.junit.Test; + +/** + * Test the functionalities of SmallLayeredNeuralNetworkMessage. + * + */ +public class TestSmallLayeredNeuralNetworkMessage { + + @Test + public void testReadWriteWithoutPrev() { + double error = 0.22; + double[][] matrix1 = new double[][] { { 0.1, 0.2, 0.8, 0.5 }, + { 0.3, 0.4, 0.6, 0.2 }, { 0.5, 0.6, 0.1, 0.5 } }; + double[][] matrix2 = new double[][] { { 0.8, 1.2, 0.5 } }; + DoubleMatrix[] matrices = new DoubleMatrix[2]; + matrices[0] = new DenseDoubleMatrix(matrix1); + matrices[1] = new DenseDoubleMatrix(matrix2); + + boolean isConverge = false; + + SmallLayeredNeuralNetworkMessage message = new SmallLayeredNeuralNetworkMessage( + error, isConverge, matrices, null); + Configuration conf = new Configuration(); + String strPath = "/tmp/testReadWriteSmallLayeredNeuralNetworkMessage"; + Path path = new Path(strPath); + try { + FileSystem fs = FileSystem.get(new URI(strPath), conf); + FSDataOutputStream out = fs.create(path); + message.write(out); + out.close(); + + FSDataInputStream in = fs.open(path); + SmallLayeredNeuralNetworkMessage readMessage = new SmallLayeredNeuralNetworkMessage( + 0, isConverge, null, null); + readMessage.readFields(in); + in.close(); + assertEquals(error, readMessage.getTrainingError(), 0.000001); + assertFalse(readMessage.isConverge()); + DoubleMatrix[] readMatrices = readMessage.getCurMatrices(); + assertEquals(2, readMatrices.length); + for (int i = 0; i < readMatrices.length; ++i) { + double[][] doubleMatrices = ((DenseDoubleMatrix) readMatrices[i]) + .getValues(); + double[][] doubleExpected = ((DenseDoubleMatrix) matrices[i]) + .getValues(); + for (int r = 0; r < doubleMatrices.length; ++r) { + assertArrayEquals(doubleExpected[r], doubleMatrices[r], 0.000001); + } + } + + DoubleMatrix[] readPrevMatrices = readMessage.getPrevMatrices(); + assertNull(readPrevMatrices); + + // delete + fs.delete(path, true); + } catch (IOException e) { + e.printStackTrace(); + } catch (URISyntaxException e) { + e.printStackTrace(); + } + } + + @Test + public void testReadWriteWithPrev() { + double error = 0.22; + boolean isConverge = true; + + double[][] matrix1 = new double[][] { { 0.1, 0.2, 0.8, 0.5 }, + { 0.3, 0.4, 0.6, 0.2 }, { 0.5, 0.6, 0.1, 0.5 } }; + double[][] matrix2 = new double[][] { { 0.8, 1.2, 0.5 } }; + DoubleMatrix[] matrices = new DoubleMatrix[2]; + matrices[0] = new DenseDoubleMatrix(matrix1); + matrices[1] = new DenseDoubleMatrix(matrix2); + + double[][] prevMatrix1 = new double[][] { { 0.1, 0.1, 0.2, 0.3 }, + { 0.2, 0.4, 0.1, 0.5 }, { 0.5, 0.1, 0.5, 0.2 } }; + double[][] prevMatrix2 = new double[][] { { 0.1, 0.2, 0.5, 0.9 }, + { 0.3, 0.5, 0.2, 0.6 }, { 0.6, 0.8, 0.7, 0.5 } }; + + DoubleMatrix[] prevMatrices = new DoubleMatrix[2]; + prevMatrices[0] = new DenseDoubleMatrix(prevMatrix1); + prevMatrices[1] = new DenseDoubleMatrix(prevMatrix2); + + SmallLayeredNeuralNetworkMessage message = new SmallLayeredNeuralNetworkMessage( + error, isConverge, matrices, prevMatrices); + Configuration conf = new Configuration(); + String strPath = "/tmp/testReadWriteSmallLayeredNeuralNetworkMessageWithPrev"; + Path path = new Path(strPath); + try { + FileSystem fs = FileSystem.get(new URI(strPath), conf); + FSDataOutputStream out = fs.create(path); + message.write(out); + out.close(); + + FSDataInputStream in = fs.open(path); + SmallLayeredNeuralNetworkMessage readMessage = new SmallLayeredNeuralNetworkMessage( + 0, isConverge, null, null); + readMessage.readFields(in); + in.close(); + + assertTrue(readMessage.isConverge()); + + DoubleMatrix[] readMatrices = readMessage.getCurMatrices(); + assertEquals(2, readMatrices.length); + for (int i = 0; i < readMatrices.length; ++i) { + double[][] doubleMatrices = ((DenseDoubleMatrix) readMatrices[i]) + .getValues(); + double[][] doubleExpected = ((DenseDoubleMatrix) matrices[i]) + .getValues(); + for (int r = 0; r < doubleMatrices.length; ++r) { + assertArrayEquals(doubleExpected[r], doubleMatrices[r], 0.000001); + } + } + + DoubleMatrix[] readPrevMatrices = readMessage.getPrevMatrices(); + assertEquals(2, readPrevMatrices.length); + for (int i = 0; i < readPrevMatrices.length; ++i) { + double[][] doubleMatrices = ((DenseDoubleMatrix) readPrevMatrices[i]) + .getValues(); + double[][] doubleExpected = ((DenseDoubleMatrix) prevMatrices[i]) + .getValues(); + for (int r = 0; r < doubleMatrices.length; ++r) { + assertArrayEquals(doubleExpected[r], doubleMatrices[r], 0.000001); + } + } + + // delete + fs.delete(path, true); + } catch (IOException e) { + e.printStackTrace(); + } catch (URISyntaxException e) { + e.printStackTrace(); + } + } + +} Index: ml/src/test/java/org/apache/hama/ml/perception/TestSmallMultiLayerPerceptron.java =================================================================== --- ml/src/test/java/org/apache/hama/ml/perception/TestSmallMultiLayerPerceptron.java (revision 1512864) +++ ml/src/test/java/org/apache/hama/ml/perception/TestSmallMultiLayerPerceptron.java (working copy) @@ -40,7 +40,6 @@ import org.apache.hama.ml.math.DoubleVector; import org.apache.hama.ml.writable.MatrixWritable; import org.apache.hama.ml.writable.VectorWritable; -import org.junit.Ignore; import org.junit.Test; public class TestSmallMultiLayerPerceptron { @@ -305,10 +304,9 @@ e.printStackTrace(); } } - + /** - * Test training with momentum. - * The MLP can converge faster. + * Test training with momentum. The MLP can converge faster. */ @Test public void testWithMomentum() { @@ -359,7 +357,6 @@ * Test the XOR problem. */ @Test - @Ignore public void testTrainingByXOR() { // write in some training instances Configuration conf = new Configuration();