Index: core/src/main/java/org/apache/hama/pipes/PipesApplication.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/PipesApplication.java (revision 1526847) +++ core/src/main/java/org/apache/hama/pipes/PipesApplication.java (working copy) @@ -213,9 +213,6 @@ LOG.debug("DEBUG: cmd: " + cmd); process = runClient(cmd, env); // fork c++ binary - LOG.debug("DEBUG: waiting for Client at " - + serverSocket.getLocalSocketAddress()); - try { if (!streamingEnabled) { LOG.debug("DEBUG: waiting for Client at " Index: core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java (revision 1526847) +++ core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java (working copy) @@ -315,10 +315,10 @@ WritableUtils.writeVInt(stream, MessageType.READ_KEYVALUE.code); if (pair != null) { - binProtocol.writeObject(pair.getKey()); - binProtocol.writeObject(pair.getValue()); + binProtocol.writeObject(new Text(pair.getKey().toString())); + String valueStr = pair.getValue().toString(); + binProtocol.writeObject(new Text(valueStr)); - String valueStr = pair.getValue().toString(); LOG.debug("Responded MessageType.READ_KEYVALUE - Key: " + pair.getKey() + " Value: " Index: core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java (revision 1526847) +++ core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java (working copy) @@ -35,8 +35,6 @@ import org.apache.hadoop.io.Writable; import org.apache.hama.bsp.BSPPeer; import org.apache.hama.bsp.sync.SyncException; -import org.apache.hama.pipes.protocol.UplinkReader; -import org.apache.hama.pipes.protocol.StreamingProtocol.StreamingUplinkReaderThread; import org.apache.hama.util.KeyValuePair; /** Index: c++/src/main/native/examples/README.txt =================================================================== --- c++/src/main/native/examples/README.txt (revision 0) +++ c++/src/main/native/examples/README.txt (revision 0) @@ -0,0 +1,113 @@ +#################################################################### +# Hama Pipes Examples # +#################################################################### +# - Summation # +# - PiEstimator # +# - MatrixMultiplication # +#################################################################### + +To run the examples, first compile them: + +% mvn install + +and then copy the binaries to dfs: + +% hadoop fs -put c++/target/native/examples/summation \ + /examples/bin/summation + +create an input directory with text files: + +% hadoop fs -put my-data in-dir + +and run the word count example: + +% hama pipes \ + -conf c++/src/main/native/examples/conf/summation.xml \ + -input in-dir -output out-dir + +#################################################################### + +# Summation Example + +% hadoop fs -put c++/target/native/examples/summation \ + /examples/bin/summation + +% echo -e "key1\t1.0\nkey2\t2.0\nkey3\t3.0\nkey4\t4.0\n\ +key5\t5.0\nkey6\t6.0\nkey7\t7.0\nkey8\t8.0\nkey9\t9.0\n\ +key10\t10.0" > summation.txt && hadoop fs -put summation.txt \ + /examples/input/summation/input.txt && rm summation.txt + +% hama pipes \ + -conf c++/src/main/native/examples/conf/summation.xml \ + -input /examples/input/summation \ + -output /examples/output/summation + +% hadoop fs -cat /examples/input/summation/input.txt +% hadoop fs -cat /examples/output/summation/part-00000 + +% hadoop fs -rmr /examples/input/summation +% hadoop fs -rmr /examples/output/summation + +#################################################################### + +# PiEstimator Example + +% hadoop fs -put c++/target/native/examples/piestimator \ + /examples/bin/piestimator + +% hama pipes \ + -conf c++/src/main/native/examples/conf/piestimator.xml \ + -output /examples/output/piestimator + +% hadoop fs -cat /examples/output/piestimator/part-00001 + +% hadoop fs -rmr /examples/output/piestimator + +#################################################################### + +# MatrixMultiplication Example + +% hadoop fs -put c++/target/native/examples/matrixmultiplication \ + /examples/bin/matrixmultiplication + +% hadoop fs -put c++/src/main/native/examples/input/MatrixA.seq \ + /examples/input/matrixmultiplication/MatrixA.seq + +% hadoop fs -put \ + c++/src/main/native/examples/input/MatrixB_transposed.seq \ + /examples/input/matrixmultiplication/MatrixB_transposed.seq + +% hama pipes \ + -conf c++/src/main/native/examples/conf/matrixmultiplication.xml \ + -output /examples/output/matrixmultiplication + +% hama seqdumper \ + -seqFile /examples/input/matrixmultiplication/MatrixA.seq + +% hama seqdumper \ + -seqFile /examples/input/matrixmultiplication/MatrixB_transposed.seq + +% hadoop fs -cat /examples/output/matrixmultiplication/part-00000 + +# Matrix A +# 9 4 1 9 +# 1 8 6 3 +# 8 3 3 9 +# 7 1 9 6 + +# Matrix B (not transposed) +# 2 1 6 5 +# 7 8 9 5 +# 2 1 5 8 +# 7 4 4 9 + +# Resulting Matrix C +# 111.0 78.0 131.0 154.0 +# 91.0 83.0 120.0 120.0 +# 106.0 71.0 126.0 160.0 +# 81.0 48.0 120.0 166.0 + +% hadoop fs -rmr /examples/output/matrixmultiplication + +#################################################################### + Index: c++/src/main/native/examples/input/MatrixA.seq =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: c++/src/main/native/examples/input/MatrixA.seq ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: c++/src/main/native/examples/input/MatrixB_transposed.seq =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: c++/src/main/native/examples/input/MatrixB_transposed.seq ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: c++/src/main/native/examples/impl/DenseDoubleVector.cc =================================================================== --- c++/src/main/native/examples/impl/DenseDoubleVector.cc (revision 0) +++ c++/src/main/native/examples/impl/DenseDoubleVector.cc (revision 0) @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hadoop/StringUtils.hh" +#include "hadoop/Splitter.hh" +#include "DenseDoubleVector.hh" + +#include +#include +#include +#include +#include + +using std::string; +using std::cout; +using HadoopUtils::Splitter; + +namespace math { + + DenseDoubleVector::DenseDoubleVector(int len) : size(len), vector(new double[len]) { + } + + DenseDoubleVector::DenseDoubleVector(int len, double val) : size(len), vector(new double[len]) { + for (int i=0; igetLength()); + for (int i = 0; i < v->getLength(); i++) { + newv->set(i, this->get(i) + v->get(i)); + } + return newv; + } + + DenseDoubleVector* DenseDoubleVector::add(double scalar) { + DenseDoubleVector *newv = new DenseDoubleVector(this->getLength()); + for (int i = 0; i < this->getLength(); i++) { + newv->set(i, this->get(i) + scalar); + } + return newv; + } + + DenseDoubleVector* DenseDoubleVector::subtract(DenseDoubleVector *v) { + DenseDoubleVector *newv = new DenseDoubleVector(v->getLength()); + for (int i = 0; i < v->getLength(); i++) { + newv->set(i, this->get(i) - v->get(i)); + } + return newv; + } + + DenseDoubleVector* DenseDoubleVector::subtract(double v) { + DenseDoubleVector *newv = new DenseDoubleVector(size); + for (int i = 0; i < size; i++) { + newv->set(i, vector[i] - v); + } + return newv; + } + + DenseDoubleVector* DenseDoubleVector::subtractFrom(double v) { + DenseDoubleVector *newv = new DenseDoubleVector(size); + for (int i = 0; i < size; i++) { + newv->set(i, v - vector[i]); + } + return newv; + } + + DenseDoubleVector* DenseDoubleVector::multiply(double scalar) { + DenseDoubleVector *v = new DenseDoubleVector(size); + for (int i = 0; i < size; i++) { + v->set(i, this->get(i) * scalar); + } + return v; + } + + DenseDoubleVector* DenseDoubleVector::divide(double scalar) { + DenseDoubleVector *v = new DenseDoubleVector(size); + for (int i = 0; i < size; i++) { + v->set(i, this->get(i) / scalar); + } + return v; + } + + /* + DenseDoubleVector* pow(int x) { + DenseDoubleVector *v = new DenseDoubleVector(size); + for (int i = 0; i < size; i++) { + double value = 0.0; + // it is faster to multiply when we having ^2 + if (x == 2) { + value = vector[i] * vector[i]; + } + else { + value = pow(vector[i], x); + } + v->set(i, value); + } + return v; + } + + DenseDoubleVector* sqrt() { + DenseDoubleVector *v = new DenseDoubleVector(size); + for (int i = 0; i < size; i++) { + v->set(i, sqrt(vector[i])); + } + return v; + } + */ + + double DenseDoubleVector::sum() { + double sum = 0.0; + for (int i = 0; i < size; i++) { + sum += vector[i]; + } + return sum; + } + + /* + DenseDoubleVector* abs() { + DenseDoubleVector *v = new DenseDoubleVector(size); + for (int i = 0; i < size; i++) { + v->set(i, abs(vector[i])); + } + return v; + } + */ + + double DenseDoubleVector::dot(DenseDoubleVector *s) { + double dotProduct = 0.0; + for (int i = 0; i < size; i++) { + dotProduct += this->get(i) * s->get(i); + } + return dotProduct; + } + + double DenseDoubleVector::max() { + double max = std::numeric_limits::min(); + for (int i = 0; i < size; i++) { + double d = vector[i]; + if (d > max) { + max = d; + } + } + return max; + } + + int DenseDoubleVector::maxIndex() { + double max = std::numeric_limits::min(); + int maxIndex = 0; + for (int i = 0; i < size; i++) { + double d = vector[i]; + if (d > max) { + max = d; + maxIndex = i; + } + } + return maxIndex; + } + + double DenseDoubleVector::min() { + double min = std::numeric_limits::max(); + for (int i = 0; i < size; i++) { + double d = vector[i]; + if (d < min) { + min = d; + } + } + return min; + } + + int DenseDoubleVector::minIndex() { + double min = std::numeric_limits::max(); + int minIndex = 0; + for (int i = 0; i < size; i++) { + double d = vector[i]; + if (d < min) { + min = d; + minIndex = i; + } + } + return minIndex; + } + + double* DenseDoubleVector::toArray() { + return vector; + } + + string DenseDoubleVector::toString() { + string str; + string delimiter = ","; + for (int i = 0; i < size; i++) + if (i==0) + str += HadoopUtils::toString(vector[i]); + else + str += delimiter + HadoopUtils::toString(vector[i]); + + return str; + } +} + Index: c++/src/main/native/examples/impl/summation.cc =================================================================== --- c++/src/main/native/examples/impl/summation.cc (revision 0) +++ c++/src/main/native/examples/impl/summation.cc (revision 0) @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hama/Pipes.hh" +#include "hama/TemplateFactory.hh" +#include "hadoop/StringUtils.hh" + +#include +#include +#include + +using std::string; + +using HamaPipes::BSP; +using HamaPipes::BSPContext; + +class SummationBSP: public BSP { +private: + string masterTask; +public: + SummationBSP(BSPContext& context) { } + + void setup(BSPContext& context) { + // Choose one as a master + masterTask = context.getPeerName(context.getNumPeers() / 2); + } + + void bsp(BSPContext& context) { + + double intermediateSum = 0.0; + string key; + string value; + + while(context.readNext(key,value)) { + intermediateSum += HadoopUtils::toDouble(value); + } + + context.sendMessage(masterTask, HadoopUtils::toString(intermediateSum)); + context.sync(); + } + + void cleanup(BSPContext& context) { + if (context.getPeerName().compare(masterTask)==0) { + + double sum = 0.0; + int msgCount = context.getNumCurrentMessages(); + for (int i=0; i()); +} + Index: c++/src/main/native/examples/impl/piestimator.cc =================================================================== --- c++/src/main/native/examples/impl/piestimator.cc (revision 0) +++ c++/src/main/native/examples/impl/piestimator.cc (revision 0) @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hama/Pipes.hh" +#include "hama/TemplateFactory.hh" +#include "hadoop/StringUtils.hh" + +#include +#include +#include +#include +#include + +using std::string; +using std::cout; + +using HamaPipes::BSP; +using HamaPipes::BSPContext; +using namespace HadoopUtils; + +class PiEstimatorBSP: public BSP { +private: + string masterTask; + long iterations; // iterations_per_bsp_task +public: + PiEstimatorBSP(BSPContext& context) { + iterations = 1000000L; + } + + inline double closed_interval_rand(double x0, double x1) { + return x0 + (x1 - x0) * rand() / ((double) RAND_MAX); + } + + void setup(BSPContext& context) { + // Choose one as a master + masterTask = context.getPeerName(context.getNumPeers() / 2); + } + + void bsp(BSPContext& context) { + + /* initialize random seed */ + srand(time(NULL)); + + int in = 0; + for (long i = 0; i < iterations; i++) { + double x = 2.0 * closed_interval_rand(0, 1) - 1.0; + double y = 2.0 * closed_interval_rand(0, 1) - 1.0; + if (sqrt(x * x + y * y) < 1.0) { + in++; + } + } + + context.sendMessage(masterTask, toString(in)); + context.sync(); + } + + void cleanup(BSPContext& context) { + if (context.getPeerName().compare(masterTask)==0) { + + long totalHits = 0; + int msgCount = context.getNumCurrentMessages(); + string received; + for (int i=0; i()); +} + Index: c++/src/main/native/examples/impl/matrixmultiplication.cc =================================================================== --- c++/src/main/native/examples/impl/matrixmultiplication.cc (revision 0) +++ c++/src/main/native/examples/impl/matrixmultiplication.cc (revision 0) @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hama/Pipes.hh" +#include "hama/TemplateFactory.hh" +#include "hadoop/StringUtils.hh" +#include "DenseDoubleVector.hh" + +#include +#include +#include +#include +#include +#include + +using std::string; +using std::cout; + +using HamaPipes::BSP; +using HamaPipes::BSPJob; +using HamaPipes::Partitioner; +using HamaPipes::BSPContext; +using namespace HadoopUtils; + +using math::DenseDoubleVector; + +class MatrixMultiplicationBSP: public BSP { +private: + string masterTask; + int seqFileID; + string HAMA_MAT_MULT_B_PATH; +public: + MatrixMultiplicationBSP(BSPContext& context) { + seqFileID = 0; + HAMA_MAT_MULT_B_PATH = "hama.mat.mult.B.path"; + } + + void setup(BSPContext& context) { + // Choose one as a master + masterTask = context.getPeerName(context.getNumPeers() / 2); + + reopenMatrixB(context); + } + + void bsp(BSPContext& context) { + + string aRowKey; + string aRowVectorStr; + // while for each row of matrix A + while(context.readNext(aRowKey, aRowVectorStr)) { + + DenseDoubleVector *aRowVector = new DenseDoubleVector(aRowVectorStr); + DenseDoubleVector *colValues = NULL; + + string bColKey; + string bColVectorStr; + + // while for each col of matrix B + while (context.sequenceFileReadNext(seqFileID,bColKey,bColVectorStr)) { + + DenseDoubleVector *bColVector = new DenseDoubleVector(bColVectorStr); + + if (colValues == NULL) + colValues = new DenseDoubleVector(bColVector->getDimension()); + + double dot = aRowVector->dot(bColVector); + + colValues->set(toInt(bColKey), dot); + } + + // Submit one calculated row + std::stringstream message; + message << aRowKey << ":" << colValues->toString(); + context.sendMessage(masterTask, message.str()); + + reopenMatrixB(context); + } + + context.sequenceFileClose(seqFileID); + context.sync(); + } + + void cleanup(BSPContext& context) { + if (context.getPeerName().compare(masterTask)==0) { + + int msgCount = context.getNumCurrentMessages(); + + for (int i=0; iget(HAMA_MAT_MULT_B_PATH); + + seqFileID = context.sequenceFileOpen(path,"r", + "org.apache.hadoop.io.IntWritable", + "org.apache.hama.ml.writable.VectorWritable"); + } + +}; + +class MatrixRowPartitioner: public Partitioner { +public: + MatrixRowPartitioner(BSPContext& context) { } + + int partition(const string& key,const string& value, int32_t numTasks) { + return toInt(key) % numTasks; + } +}; + +int main(int argc, char *argv[]) { + return HamaPipes::runTask(HamaPipes::TemplateFactory()); +} Index: c++/src/main/native/examples/impl/DenseDoubleVector.hh =================================================================== --- c++/src/main/native/examples/impl/DenseDoubleVector.hh (revision 0) +++ c++/src/main/native/examples/impl/DenseDoubleVector.hh (revision 0) @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +using std::string; + +namespace math { + + class DenseDoubleVector { + private: + double *vector; + int size; + public: + DenseDoubleVector(); // Default-Constructor + DenseDoubleVector(int length); + /// Creates a new vector with the given length and default value. + DenseDoubleVector(int length, double val); + // Creates a new vector with the given array. + DenseDoubleVector(double arr[]); + DenseDoubleVector(const string values); + ~DenseDoubleVector(); // Destructor + + int getLength(); + int getDimension(); + void set(int index, double value); + double get(int index); + + DenseDoubleVector *add(DenseDoubleVector *v); + DenseDoubleVector *add(double scalar); + DenseDoubleVector *subtract(DenseDoubleVector *v); + DenseDoubleVector *subtract(double v); + DenseDoubleVector *subtractFrom(double v); + + DenseDoubleVector *multiply(double scalar); + DenseDoubleVector *divide(double scalar); + /* + DenseDoubleVector *pow(int x); + DenseDoubleVector *sqrt(); + */ + double sum(); + + //DenseDoubleVector *abs(); + double dot(DenseDoubleVector *s); + + double max(); + int maxIndex(); + double min(); + int minIndex(); + + double *toArray(); + virtual string toString(); + }; +} + Index: c++/src/main/native/examples/conf/summation.xml =================================================================== --- c++/src/main/native/examples/conf/summation.xml (revision 0) +++ c++/src/main/native/examples/conf/summation.xml (revision 0) @@ -0,0 +1,44 @@ + + + + + + hama.pipes.executable + hdfs:/examples/bin/summation + + + hama.pipes.java.recordreader + true + + + hama.pipes.java.recordwriter + true + + + bsp.input.format.class + org.apache.hama.bsp.KeyValueTextInputFormat + + + bsp.output.format.class + org.apache.hama.bsp.TextOutputFormat + + + hama.pipes.logging + false + + Index: c++/src/main/native/examples/conf/piestimator.xml =================================================================== --- c++/src/main/native/examples/conf/piestimator.xml (revision 0) +++ c++/src/main/native/examples/conf/piestimator.xml (revision 0) @@ -0,0 +1,48 @@ + + + + + + hama.pipes.executable + hdfs:/examples/bin/piestimator + + + hama.pipes.java.recordreader + true + + + hama.pipes.java.recordwriter + true + + + bsp.input.format.class + org.apache.hama.bsp.NullInputFormat + + + bsp.output.format.class + org.apache.hama.bsp.TextOutputFormat + + + hama.pipes.logging + false + + + bsp.peers.num + 3 + + Index: c++/src/main/native/examples/conf/matrixmultiplication.xml =================================================================== --- c++/src/main/native/examples/conf/matrixmultiplication.xml (revision 0) +++ c++/src/main/native/examples/conf/matrixmultiplication.xml (revision 0) @@ -0,0 +1,56 @@ + + + + + + hama.pipes.executable + hdfs:/examples/bin/matrixmultiplication + + + hama.pipes.java.recordreader + true + + + hama.pipes.java.recordwriter + true + + + bsp.input.format.class + org.apache.hama.bsp.SequenceFileInputFormat + + + bsp.input.dir + /examples/input/matrixmultiplication/MatrixA.seq + + + hama.mat.mult.B.path + /examples/input/matrixmultiplication/MatrixB_transposed.seq + + + hama.pipes.logging + false + + + hama.messenger.queue.class + org.apache.hama.bsp.message.queue.SortedMessageQueue + + + bsp.input.partitioner.class + org.apache.hama.pipes.PipesPartitioner + + Index: c++/src/main/native/pipes/impl/HamaPipes.cc =================================================================== --- c++/src/main/native/pipes/impl/HamaPipes.cc (revision 1526847) +++ c++/src/main/native/pipes/impl/HamaPipes.cc (working copy) @@ -50,11 +50,11 @@ using namespace HadoopUtils; namespace HamaPipes { - + bool logging; /********************************************/ - /****************** BSPJob ******************/ + /****************** BSPJob ******************/ /********************************************/ class BSPJobImpl: public BSPJob { private: @@ -63,37 +63,37 @@ void set(const string& key, const string& value) { values[key] = value; } - + virtual bool hasKey(const string& key) const { return values.find(key) != values.end(); } - + virtual const string& get(const string& key) const { map::const_iterator itr = values.find(key); if (itr == values.end()) { throw Error("Key " + key + " not found in BSPJob"); - } + } return itr->second; } - + virtual int getInt(const string& key) const { const string& val = get(key); return toInt(val); } - + virtual float getFloat(const string& key) const { const string& val = get(key); return toFloat(val); } - + virtual bool getBoolean(const string&key) const { const string& val = get(key); return toBool(val); } }; - + /********************************************/ - /************* DownwardProtocol *************/ + /************* DownwardProtocol *************/ /********************************************/ class DownwardProtocol { public: @@ -101,26 +101,26 @@ virtual void setBSPJob(vector values) = 0; virtual void setInputTypes(string keyType, string valueType) = 0; virtual void setKeyValue(const string& _key, const string& _value) = 0; - + virtual void runBsp(bool pipedInput, bool pipedOutput) = 0; virtual void runCleanup(bool pipedInput, bool pipedOutput) = 0; virtual void runSetup(bool pipedInput, bool pipedOutput) = 0; - virtual void runPartition(const string& key, const string& value, int32_t numTasks) = 0; - + virtual void runPartition(const string& key, const string& value, int32_t numTasks) = 0; + virtual void setNewResult(int32_t value) = 0; - virtual void setNewResult(int64_t value) = 0; + virtual void setNewResult(int64_t value) = 0; virtual void setNewResult(const string& value) = 0; virtual void setNewResult(vector value) = 0; - + //virtual void reduceKey(const string& key) = 0; //virtual void reduceValue(const string& value) = 0; virtual void close() = 0; virtual void abort() = 0; virtual ~DownwardProtocol() {} }; - + /********************************************/ - /************** UpwardProtocol **************/ + /************** UpwardProtocol **************/ /********************************************/ class UpwardProtocol { public: @@ -129,15 +129,15 @@ virtual void sendCMD(int32_t cmd, int32_t value, const string values[], int size) = 0; virtual void sendCMD(int32_t cmd, const string& value) = 0; virtual void sendCMD(int32_t cmd, const string values[], int size) = 0; - + //virtual void registerCounter(int id, const string& group, const string& name) = 0; //virtual void incrementCounter(const TaskContext::Counter* counter, uint64_t amount) = 0; virtual void incrementCounter(const string& group, const string& name, uint64_t amount) = 0; virtual ~UpwardProtocol() {} }; - + /********************************************/ - /***************** Protocol *****************/ + /***************** Protocol *****************/ /********************************************/ class Protocol { public: @@ -145,47 +145,47 @@ virtual UpwardProtocol* getUplink() = 0; virtual ~Protocol(){} }; - + /********************************************/ - /*************** MESSAGE_TYPE ***************/ + /*************** MESSAGE_TYPE ***************/ /********************************************/ enum MESSAGE_TYPE { - START_MESSAGE, SET_BSPJOB_CONF, SET_INPUT_TYPES, - RUN_SETUP, RUN_BSP, RUN_CLEANUP, - READ_KEYVALUE, WRITE_KEYVALUE, - GET_MSG, GET_MSG_COUNT, - SEND_MSG, SYNC, - GET_ALL_PEERNAME, GET_PEERNAME, - GET_PEER_INDEX, GET_PEER_COUNT, GET_SUPERSTEP_COUNT, - REOPEN_INPUT, CLEAR, - CLOSE, ABORT, - DONE, TASK_DONE, - REGISTER_COUNTER, INCREMENT_COUNTER, - SEQFILE_OPEN, SEQFILE_READNEXT, - SEQFILE_APPEND, SEQFILE_CLOSE, - PARTITION_REQUEST, PARTITION_RESPONSE + START_MESSAGE, SET_BSPJOB_CONF, SET_INPUT_TYPES, + RUN_SETUP, RUN_BSP, RUN_CLEANUP, + READ_KEYVALUE, WRITE_KEYVALUE, + GET_MSG, GET_MSG_COUNT, + SEND_MSG, SYNC, + GET_ALL_PEERNAME, GET_PEERNAME, + GET_PEER_INDEX, GET_PEER_COUNT, GET_SUPERSTEP_COUNT, + REOPEN_INPUT, CLEAR, + CLOSE, ABORT, + DONE, TASK_DONE, + REGISTER_COUNTER, INCREMENT_COUNTER, + SEQFILE_OPEN, SEQFILE_READNEXT, + SEQFILE_APPEND, SEQFILE_CLOSE, + PARTITION_REQUEST, PARTITION_RESPONSE }; - + /* Only needed for debugging output */ const char* messageTypeNames[] = { - stringify( START_MESSAGE ), stringify( SET_BSPJOB_CONF ), stringify( SET_INPUT_TYPES ), - stringify( RUN_SETUP ), stringify( RUN_BSP ), stringify( RUN_CLEANUP ), - stringify( READ_KEYVALUE ), stringify( WRITE_KEYVALUE ), - stringify( GET_MSG ), stringify( GET_MSG_COUNT ), - stringify( SEND_MSG ), stringify( SYNC ), - stringify( GET_ALL_PEERNAME ), stringify( GET_PEERNAME ), - stringify( GET_PEER_INDEX ), stringify( GET_PEER_COUNT ), stringify( GET_SUPERSTEP_COUNT ), - stringify( REOPEN_INPUT ), stringify( CLEAR ), - stringify( CLOSE ), stringify( ABORT ), - stringify( DONE ), stringify( TASK_DONE ), - stringify( REGISTER_COUNTER ), stringify( INCREMENT_COUNTER ), - stringify( SEQFILE_OPEN ), stringify( SEQFILE_READNEXT ), - stringify( SEQFILE_APPEND ), stringify( SEQFILE_CLOSE ), - stringify( PARTITION_REQUEST ), stringify( PARTITION_RESPONSE ) - }; - + stringify( START_MESSAGE ), stringify( SET_BSPJOB_CONF ), stringify( SET_INPUT_TYPES ), + stringify( RUN_SETUP ), stringify( RUN_BSP ), stringify( RUN_CLEANUP ), + stringify( READ_KEYVALUE ), stringify( WRITE_KEYVALUE ), + stringify( GET_MSG ), stringify( GET_MSG_COUNT ), + stringify( SEND_MSG ), stringify( SYNC ), + stringify( GET_ALL_PEERNAME ), stringify( GET_PEERNAME ), + stringify( GET_PEER_INDEX ), stringify( GET_PEER_COUNT ), stringify( GET_SUPERSTEP_COUNT ), + stringify( REOPEN_INPUT ), stringify( CLEAR ), + stringify( CLOSE ), stringify( ABORT ), + stringify( DONE ), stringify( TASK_DONE ), + stringify( REGISTER_COUNTER ), stringify( INCREMENT_COUNTER ), + stringify( SEQFILE_OPEN ), stringify( SEQFILE_READNEXT ), + stringify( SEQFILE_APPEND ), stringify( SEQFILE_CLOSE ), + stringify( PARTITION_REQUEST ), stringify( PARTITION_RESPONSE ) + }; + /********************************************/ - /*********** BinaryUpwardProtocol ***********/ + /*********** BinaryUpwardProtocol ***********/ /********************************************/ class BinaryUpwardProtocol: public UpwardProtocol { private: @@ -194,66 +194,70 @@ BinaryUpwardProtocol(FILE* _stream) { stream = new FileOutStream(); HADOOP_ASSERT(stream->open(_stream), "problem opening stream"); - + } - + virtual void sendCMD(int32_t cmd) { serializeInt(cmd, *stream); stream->flush(); if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s\n", - messageTypeNames[cmd]); + messageTypeNames[cmd]); } - + virtual void sendCMD(int32_t cmd, int32_t value) { serializeInt(cmd, *stream); serializeInt(value, *stream); stream->flush(); - if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Value: %d\n",messageTypeNames[cmd],value); + if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Value: %d\n", + messageTypeNames[cmd],value); } - + virtual void sendCMD(int32_t cmd, const string& value) { serializeInt(cmd, *stream); serializeString(value, *stream); stream->flush(); - if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Value: %s\n",messageTypeNames[cmd],value.c_str()); + if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Value: %s\n", + messageTypeNames[cmd],value.c_str()); } - + virtual void sendCMD(int32_t cmd, const string values[], int size) { - serializeInt(cmd, *stream); - for (int i=0; iflush(); } - + virtual void sendCMD(int32_t cmd, int32_t value, const string values[], int size) { serializeInt(cmd, *stream); serializeInt(value, *stream); - for (int i=0; iflush(); } - + /* - virtual void registerCounter(int id, const string& group, - const string& name) { - serializeInt(REGISTER_COUNTER, *stream); - serializeInt(id, *stream); - serializeString(group, *stream); - serializeString(name, *stream); - } + virtual void registerCounter(int id, const string& group, + const string& name) { + serializeInt(REGISTER_COUNTER, *stream); + serializeInt(id, *stream); + serializeString(group, *stream); + serializeString(name, *stream); + } + + virtual void incrementCounter(const TaskContext::Counter* counter, + uint64_t amount) { + serializeInt(INCREMENT_COUNTER, *stream); + serializeInt(counter->getId(), *stream); + serializeLong(amount, *stream); + } + */ - virtual void incrementCounter(const TaskContext::Counter* counter, - uint64_t amount) { - serializeInt(INCREMENT_COUNTER, *stream); - serializeInt(counter->getId(), *stream); - serializeLong(amount, *stream); - } - */ - virtual void incrementCounter(const string& group, const string& name, uint64_t amount) { serializeInt(INCREMENT_COUNTER, *stream); serializeString(group, *stream); @@ -262,235 +266,258 @@ stream->flush(); if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent incrementCounter\n"); } - + ~BinaryUpwardProtocol() { delete stream; } }; - + /********************************************/ - /************** BinaryProtocol **************/ + /************** BinaryProtocol **************/ /********************************************/ class BinaryProtocol: public Protocol { private: FileInStream* downStream; DownwardProtocol* handler; BinaryUpwardProtocol * uplink; - + string key; string value; - + public: BinaryProtocol(FILE* down, DownwardProtocol* _handler, FILE* up) { downStream = new FileInStream(); downStream->open(down); uplink = new BinaryUpwardProtocol(up); handler = _handler; - - //authDone = false; - //getPassword(password); } - + UpwardProtocol* getUplink() { return uplink; } - - + + virtual void nextEvent() { int32_t cmd; cmd = deserializeInt(*downStream); - - switch (cmd) { - - case START_MESSAGE: { - int32_t prot; - prot = deserializeInt(*downStream); - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got START_MESSAGE prot: %d\n", prot); - handler->start(prot); - break; - } - /* SET BSP Job Configuration / Environment */ - case SET_BSPJOB_CONF: { - int32_t entries; - entries = deserializeInt(*downStream); - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF entries: %d\n", entries); - vector result(entries*2); - for(int i=0; i < entries*2; ++i) { - string item; - deserializeString(item, *downStream); - result.push_back(item); - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF add NewEntry: %s\n", item.c_str()); + + switch (cmd) { + + case START_MESSAGE: { + int32_t prot; + prot = deserializeInt(*downStream); + if(logging) + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got START_MESSAGE prot: %d\n", prot); + handler->start(prot); + break; } - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got all Configuration %d entries!\n", entries); - handler->setBSPJob(result); - break; - } - case SET_INPUT_TYPES: { - string keyType; - string valueType; - deserializeString(keyType, *downStream); - deserializeString(valueType, *downStream); - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_INPUT_TYPES keyType: %s valueType: %s\n", - keyType.c_str(),valueType.c_str()); - handler->setInputTypes(keyType, valueType); - break; - } - case READ_KEYVALUE: { - deserializeString(key, *downStream); - deserializeString(value, *downStream); - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got READ_KEYVALUE key: %s value: %s\n", - key.c_str(), - ((value.length()<10)?value.c_str():value.substr(0,9).c_str()) ); - handler->setKeyValue(key, value); - break; - } - case RUN_SETUP: { - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_SETUP\n"); - int32_t pipedInput; - int32_t pipedOutput; - pipedInput = deserializeInt(*downStream); - pipedOutput = deserializeInt(*downStream); - handler->runSetup(pipedInput, pipedOutput); - break; - } - case RUN_BSP: { - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_BSP\n"); - int32_t pipedInput; - int32_t pipedOutput; - pipedInput = deserializeInt(*downStream); - pipedOutput = deserializeInt(*downStream); - handler->runBsp(pipedInput, pipedOutput); - break; - } - case RUN_CLEANUP: { - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_CLEANUP\n"); - int32_t pipedInput; - int32_t pipedOutput; - pipedInput = deserializeInt(*downStream); - pipedOutput = deserializeInt(*downStream); - handler->runCleanup(pipedInput, pipedOutput); - break; - } - - case PARTITION_REQUEST: { - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got PARTITION_REQUEST\n"); - string partionKey; - string partionValue; - int32_t numTasks; - deserializeString(partionKey, *downStream); - deserializeString(partionValue, *downStream); - numTasks = deserializeInt(*downStream); - handler->runPartition(partionKey, partionValue, numTasks); - break; - } - - - case GET_MSG_COUNT: { - int32_t msgCount = deserializeInt(*downStream); - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG_COUNT msgCount: %d\n",msgCount); - handler->setNewResult(msgCount); - break; - } - case GET_MSG: { - string msg; - deserializeString(msg,*downStream); - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG msg: %s\n",msg.c_str()); - handler->setNewResult(msg); - break; - } - case GET_PEERNAME: { - string peername; - deserializeString(peername,*downStream); - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEERNAME peername: %s\n",peername.c_str()); - handler->setNewResult(peername); - break; - } - case GET_ALL_PEERNAME: { - vector peernames; - int32_t peernameCount = deserializeInt(*downStream); - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peernameCount: %d\n",peernameCount); - string peername; - for (int i=0; i result(entries*2); + for(int i=0; i < entries*2; ++i) { + string item; + deserializeString(item, *downStream); + result.push_back(item); + if(logging) + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF add NewEntry: %s\n", + item.c_str()); + } + if(logging) + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got all Configuration %d entries!\n", + entries); + handler->setBSPJob(result); + break; + } + case SET_INPUT_TYPES: { + string keyType; + string valueType; + deserializeString(keyType, *downStream); + deserializeString(valueType, *downStream); + if(logging) + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_INPUT_TYPES keyType: %s valueType: %s\n", + keyType.c_str(),valueType.c_str()); + handler->setInputTypes(keyType, valueType); + break; + } + case READ_KEYVALUE: { + deserializeString(key, *downStream); + deserializeString(value, *downStream); + if(logging) + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got READ_KEYVALUE key: %s value: %s\n", + key.c_str(),((value.length()<10)?value.c_str():value.substr(0,9).c_str())); + handler->setKeyValue(key, value); + break; + } + case RUN_SETUP: { + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_SETUP\n"); + int32_t pipedInput; + int32_t pipedOutput; + pipedInput = deserializeInt(*downStream); + pipedOutput = deserializeInt(*downStream); + handler->runSetup(pipedInput, pipedOutput); + break; + } + case RUN_BSP: { + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_BSP\n"); + int32_t pipedInput; + int32_t pipedOutput; + pipedInput = deserializeInt(*downStream); + pipedOutput = deserializeInt(*downStream); + handler->runBsp(pipedInput, pipedOutput); + break; + } + case RUN_CLEANUP: { + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_CLEANUP\n"); + int32_t pipedInput; + int32_t pipedOutput; + pipedInput = deserializeInt(*downStream); + pipedOutput = deserializeInt(*downStream); + handler->runCleanup(pipedInput, pipedOutput); + break; + } + + case PARTITION_REQUEST: { + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got PARTITION_REQUEST\n"); + string partionKey; + string partionValue; + int32_t numTasks; + deserializeString(partionKey, *downStream); + deserializeString(partionValue, *downStream); + numTasks = deserializeInt(*downStream); + handler->runPartition(partionKey, partionValue, numTasks); + break; + } + + case GET_MSG_COUNT: { + int32_t msgCount = deserializeInt(*downStream); + if(logging) + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG_COUNT msgCount: %d\n", + msgCount); + handler->setNewResult(msgCount); + break; + } + case GET_MSG: { + string msg; + deserializeString(msg,*downStream); + if(logging) + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG msg: %s\n", + msg.c_str()); + handler->setNewResult(msg); + break; + } + case GET_PEERNAME: { + string peername; deserializeString(peername,*downStream); - peernames.push_back(peername); - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peername: %s\n",peername.c_str()); + if(logging) + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEERNAME peername: %s\n", + peername.c_str()); + handler->setNewResult(peername); + break; } - handler->setNewResult(peernames); - break; - } - case GET_PEER_INDEX: { - int32_t peerIndex = deserializeInt(*downStream); - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_INDEX peerIndex: %d\n",peerIndex); - handler->setNewResult(peerIndex); - break; - } - case GET_PEER_COUNT: { - int32_t peerCount = deserializeInt(*downStream); - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_COUNT peerCount: %d\n",peerCount); - handler->setNewResult(peerCount); - break; - } - case GET_SUPERSTEP_COUNT: { - int64_t superstepCount = deserializeLong(*downStream); - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_SUPERSTEP_COUNT superstepCount: %ld\n",(long)superstepCount); - handler->setNewResult(superstepCount); - break; - } - - - case SEQFILE_OPEN: { - int32_t fileID = deserializeInt(*downStream); - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_OPEN fileID: %d\n",fileID); - handler->setNewResult(fileID); - break; - } - case SEQFILE_READNEXT: { - deserializeString(key, *downStream); - deserializeString(value, *downStream); - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_READNEXT key: %s value: %s\n", - key.c_str(), - ((value.length()<10)?value.c_str():value.substr(0,9).c_str()) ); - handler->setKeyValue(key, value); - break; - } - case SEQFILE_APPEND: { + case GET_ALL_PEERNAME: { + vector peernames; + int32_t peernameCount = deserializeInt(*downStream); + if(logging) + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peernameCount: %d\n", + peernameCount); + string peername; + for (int i=0; isetNewResult(peernames); + break; + } + case GET_PEER_INDEX: { + int32_t peerIndex = deserializeInt(*downStream); + if(logging) + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_INDEX peerIndex: %d\n", + peerIndex); + handler->setNewResult(peerIndex); + break; + } + case GET_PEER_COUNT: { + int32_t peerCount = deserializeInt(*downStream); + if(logging) + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_COUNT peerCount: %d\n", + peerCount); + handler->setNewResult(peerCount); + break; + } + case GET_SUPERSTEP_COUNT: { + int64_t superstepCount = deserializeLong(*downStream); + if(logging) + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_SUPERSTEP_COUNT superstepCount: %ld\n", + (long)superstepCount); + handler->setNewResult(superstepCount); + break; + } + + + case SEQFILE_OPEN: { + int32_t fileID = deserializeInt(*downStream); + if(logging) + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_OPEN fileID: %d\n",fileID); + handler->setNewResult(fileID); + break; + } + case SEQFILE_READNEXT: { + deserializeString(key, *downStream); + deserializeString(value, *downStream); + if(logging) + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_READNEXT key: %s value: %s\n", + key.c_str(),((value.length()<10)?value.c_str():value.substr(0,9).c_str())); + handler->setKeyValue(key, value); + break; + } + case SEQFILE_APPEND: { int32_t result = deserializeInt(*downStream); - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_APPEND result: %d\n",result); + if(logging) + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_APPEND result: %d\n",result); handler->setNewResult(result); break; - } - case SEQFILE_CLOSE: { + } + case SEQFILE_CLOSE: { int32_t result = deserializeInt(*downStream); - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_CLOSE result: %d\n",result); + if(logging) + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_CLOSE result: %d\n",result); handler->setNewResult(result); break; + } + + + case CLOSE: { + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got CLOSE\n"); + handler->close(); + break; + } + case ABORT: { + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got ABORT\n"); + handler->abort(); + break; + } + default: + HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd)); + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - Unknown binary command: %d\n",cmd); } - - - case CLOSE: { - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got CLOSE\n"); - handler->close(); - break; - } - case ABORT: { - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got ABORT\n"); - handler->abort(); - break; - } - default: - HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd)); - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - Unknown binary command: %d\n",cmd); - } - } - + } + virtual ~BinaryProtocol() { delete downStream; delete uplink; } }; - + /********************************************/ - /************** BSPContextImpl **************/ + /************** BSPContextImpl **************/ /********************************************/ class BSPContextImpl: public BSPContext, public DownwardProtocol { private: @@ -509,7 +536,7 @@ //float progressFloat; //uint64_t lastProgress; //bool statusSet; - + Protocol* protocol; UpwardProtocol *uplink; @@ -517,36 +544,36 @@ RecordReader* reader; RecordWriter* writer; - + BSP* bsp; Partitioner* partitioner; const Factory* factory; pthread_mutex_t mutexDone; std::vector registeredCounterIds; - + int32_t resultInt; - bool isNewResultInt; + bool isNewResultInt; int64_t resultLong; - bool isNewResultLong; + bool isNewResultLong; string resultString; - bool isNewResultString; + bool isNewResultString; vector resultVector; - bool isNewResultVector; + bool isNewResultVector; - bool isNewKeyValuePair; + bool isNewKeyValuePair; string currentKey; string currentValue; - + public: - + BSPContextImpl(const Factory& _factory) { //statusSet = false; done = false; //newKey = NULL; factory = &_factory; job = NULL; - + inputKeyClass = NULL; inputValueClass = NULL; @@ -563,26 +590,26 @@ //progressFloat = 0.0f; hasTask = false; pthread_mutex_init(&mutexDone, NULL); - + isNewResultInt = false; isNewResultString = false, isNewResultVector = false; - + isNewKeyValuePair = false; } - - + + /********************************************/ - /*********** DownwardProtocol IMPL **********/ + /*********** DownwardProtocol IMPL **********/ /********************************************/ virtual void start(int protocol) { if (protocol != 0) { - throw Error("Protocol version " + toString(protocol) + + throw Error("Protocol version " + toString(protocol) + " not supported"); } partitioner = factory->createPartitioner(*this); } - + virtual void setBSPJob(vector values) { int len = values.size(); BSPJobImpl* result = new BSPJobImpl(); @@ -592,24 +619,25 @@ } job = result; } - + virtual void setInputTypes(string keyType, string valueType) { inputKeyClass = new string(keyType); inputValueClass = new string(valueType); } - + virtual void setKeyValue(const string& _key, const string& _value) { currentKey = _key; currentValue = _value; isNewKeyValuePair = true; } - + /* private Method */ void setupReaderWriter(bool pipedInput, bool pipedOutput) { - - if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::setupReaderWriter - pipedInput: %s pipedOutput: %s\n", - (pipedInput)?"true":"false",(pipedOutput)?"true":"false"); - + + if(logging) + fprintf(stderr,"HamaPipes::BSPContextImpl::setupReaderWriter - pipedInput: %s pipedOutput: %s\n", + (pipedInput)?"true":"false",(pipedOutput)?"true":"false"); + if (pipedInput && reader==NULL) { reader = factory->createRecordReader(*this); HADOOP_ASSERT((reader == NULL) == pipedInput, @@ -617,10 +645,10 @@ "RecordReader not defined"); //if (reader != NULL) { - // value = new string(); + // value = new string(); //} - } - + } + if (pipedOutput && writer==NULL) { writer = factory->createRecordWriter(*this); HADOOP_ASSERT((writer == NULL) == pipedOutput, @@ -628,13 +656,13 @@ "RecordWriter not defined"); } } - + virtual void runSetup(bool pipedInput, bool pipedOutput) { setupReaderWriter(pipedInput,pipedOutput); - if (bsp == NULL) + if (bsp == NULL) bsp = factory->createBSP(*this); - + if (bsp != NULL) { hasTask = true; bsp->setup(*this); @@ -642,13 +670,13 @@ uplink->sendCMD(TASK_DONE); } } - + virtual void runBsp(bool pipedInput, bool pipedOutput) { setupReaderWriter(pipedInput,pipedOutput); - - if (bsp == NULL) - bsp = factory->createBSP(*this); - + + if (bsp == NULL) + bsp = factory->createBSP(*this); + if (bsp != NULL) { hasTask = true; bsp->bsp(*this); @@ -656,10 +684,10 @@ uplink->sendCMD(TASK_DONE); } } - + virtual void runCleanup(bool pipedInput, bool pipedOutput) { setupReaderWriter(pipedInput,pipedOutput); - + if (bsp != NULL) { hasTask = true; bsp->cleanup(*this); @@ -667,54 +695,56 @@ uplink->sendCMD(TASK_DONE); } } - + /********************************************/ - /******* Partitioner *******/ - /********************************************/ + /******* Partitioner *******/ + /********************************************/ virtual void runPartition(const string& key, const string& value, int32_t numTasks){ - if (partitioner != NULL) { + if (partitioner != NULL) { int part = partitioner->partition(key, value, numTasks); uplink->sendCMD(PARTITION_RESPONSE, part); } else { - if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::runPartition Partitioner is NULL!\n"); + if(logging) + fprintf(stderr,"HamaPipes::BSPContextImpl::runPartition Partitioner is NULL!\n"); } - } - + } + virtual void setNewResult(int32_t _value) { resultInt = _value; - isNewResultInt = true; + isNewResultInt = true; } - + virtual void setNewResult(int64_t _value) { resultLong = _value; - isNewResultLong = true; + isNewResultLong = true; } - + virtual void setNewResult(const string& _value) { resultString = _value; - isNewResultString = true; + isNewResultString = true; } - + virtual void setNewResult(vector _value) { resultVector = _value; - isNewResultVector = true; + isNewResultVector = true; } - + virtual void close() { pthread_mutex_lock(&mutexDone); done = true; hasTask = false; pthread_mutex_unlock(&mutexDone); - if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::close - done: %s hasTask: %s\n", + if(logging) + fprintf(stderr,"HamaPipes::BSPContextImpl::close - done: %s hasTask: %s\n", (done)?"true":"false",(hasTask)?"true":"false"); } - + virtual void abort() { throw Error("Aborted by driver"); } - + /********************************************/ - /************** TaskContext IMPL ************/ + /************** TaskContext IMPL ************/ /********************************************/ /** @@ -723,78 +753,78 @@ virtual const BSPJob* getBSPJob() { return job; } - + /** - * Get the current key. + * Get the current key. * @return the current key or NULL if called before the first map or reduce */ //virtual const string& getInputKey() { // return key; //} - + /** - * Get the current value. - * @return the current value or NULL if called before the first map or + * Get the current value. + * @return the current value or NULL if called before the first map or * reduce */ //virtual const string& getInputValue() { // return *value; //} - + /** * Register a counter with the given group and name. */ /* - virtual Counter* getCounter(const std::string& group, - const std::string& name) { - int id = registeredCounterIds.size(); - registeredCounterIds.push_back(id); - uplink->registerCounter(id, group, name); - return new Counter(id); - }*/ - + virtual Counter* getCounter(const std::string& group, + const std::string& name) { + int id = registeredCounterIds.size(); + registeredCounterIds.push_back(id); + uplink->registerCounter(id, group, name); + return new Counter(id); + }*/ + /** * Increment the value of the counter with the given amount. */ virtual void incrementCounter(const string& group, const string& name, uint64_t amount) { - uplink->incrementCounter(group, name, amount); + uplink->incrementCounter(group, name, amount); } - + /********************************************/ - /************** BSPContext IMPL *************/ + /************** BSPContext IMPL *************/ /********************************************/ - + /** * Access the InputSplit of the bsp. */ //virtual const string& getInputSplit() { // return *inputSplit; //} - + /** * Get the name of the key class of the input to this task. */ virtual const string& getInputKeyClass() { return *inputKeyClass; } - + /** * Get the name of the value class of the input to this task. */ virtual const string& getInputValueClass() { return *inputValueClass; } - + /** * Send a data with a tag to another BSPSlave corresponding to hostname. * Messages sent by this method are not guaranteed to be received in a sent * order. */ virtual void sendMessage(const string& peerName, const string& msg) { - string values[] = {peerName, msg}; - uplink->sendCMD(SEND_MSG,values, 2); + string values[] = {peerName, msg}; + uplink->sendCMD(SEND_MSG,values, 2); } - + /** * @return A message from the peer's received messages queue (a FIFO). */ @@ -802,29 +832,31 @@ uplink->sendCMD(GET_MSG); while (!isNewResultString) - protocol->nextEvent(); - + protocol->nextEvent(); + isNewResultString = false; - if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::getMessage - NewResultString: %s\n",resultString.c_str()); + if(logging) + fprintf(stderr,"HamaPipes::BSPContextImpl::getMessage - NewResultString: %s\n", + resultString.c_str()); return resultString; } - + /** * @return The number of messages in the peer's received messages queue. */ virtual int getNumCurrentMessages() { uplink->sendCMD(GET_MSG_COUNT); - + while (!isNewResultInt) protocol->nextEvent(); isNewResultInt = false; return resultInt; } - + /** * Barrier Synchronization. - * + * * Sends all the messages in the outgoing message queues to the corresponding * remote peers. */ @@ -834,14 +866,16 @@ /** * @return the name of this peer in the format "hostname:port". - */ + */ virtual const string& getPeerName() { uplink->sendCMD(GET_PEERNAME,-1); - + while (!isNewResultString) protocol->nextEvent(); - - if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - NewResultString: %s\n",resultString.c_str()); + + if(logging) + fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - NewResultString: %s\n", + resultString.c_str()); isNewResultString = false; return resultString; } @@ -851,11 +885,13 @@ */ virtual const string& getPeerName(int index) { uplink->sendCMD(GET_PEERNAME,index); - + while (!isNewResultString) protocol->nextEvent(); - - if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - NewResultString: %s\n",resultString.c_str()); + + if(logging) + fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - NewResultString: %s\n", + resultString.c_str()); isNewResultString = false; return resultString; } @@ -866,10 +902,10 @@ */ virtual vector getAllPeerNames() { uplink->sendCMD(GET_ALL_PEERNAME); - + while (!isNewResultVector) protocol->nextEvent(); - + isNewResultVector = false; return resultVector; } @@ -879,39 +915,39 @@ */ virtual int getPeerIndex() { uplink->sendCMD(GET_PEER_INDEX); - + while (!isNewResultInt) protocol->nextEvent(); - + isNewResultInt = false; return resultInt; } - + /** * @return the number of peers */ virtual int getNumPeers() { uplink->sendCMD(GET_PEER_COUNT); - + while (!isNewResultInt) protocol->nextEvent(); - + isNewResultInt = false; - return resultInt; + return resultInt; } - + /** * @return the count of current super-step */ virtual long getSuperstepCount() { uplink->sendCMD(GET_SUPERSTEP_COUNT); - + while (!isNewResultLong) protocol->nextEvent(); - + isNewResultLong = false; - return resultLong; - } + return resultLong; + } /** * Clears all queues entries. @@ -919,17 +955,17 @@ virtual void clear() { uplink->sendCMD(CLEAR); } - + /** * Writes a key/value pair to the output collector */ virtual void write(const string& key, const string& value) { - if (writer != NULL) { - writer->emit(key, value); - } else { - string values[] = {key, value}; - uplink->sendCMD(WRITE_KEYVALUE, values, 2); - } + if (writer != NULL) { + writer->emit(key, value); + } else { + string values[] = {key, value}; + uplink->sendCMD(WRITE_KEYVALUE, values, 2); + } } /** @@ -937,21 +973,30 @@ */ virtual bool readNext(string& _key, string& _value) { uplink->sendCMD(READ_KEYVALUE); - + while (!isNewKeyValuePair) protocol->nextEvent(); isNewKeyValuePair = false; - + _key = currentKey; - _value = currentValue; - if (logging && _key.empty() && _value.empty()) + // check if value is array [0, 1, 2, ...], and remove brackets + int len = currentValue.length(); + if ( (currentValue[0]=='[') && + (currentValue[len-1]==']') ) { + _value = currentValue.substr(1,len-2); + } else { + _value = currentValue; + } + + if (logging && _key.empty() && _value.empty()) { fprintf(stderr,"HamaPipes::BSPContextImpl::readNext - Empty KeyValuePair\n"); - + } + return (!_key.empty() && !_value.empty()); } - + /** * Closes the input and opens it right away, so that the file pointer is at * the beginning again. @@ -959,154 +1004,163 @@ virtual void reopenInput() { uplink->sendCMD(REOPEN_INPUT); } - - + + /********************************************/ - /******* SequenceFileConnector IMPL *******/ - /********************************************/ - + /******* SequenceFileConnector IMPL *******/ + /********************************************/ + /** * Open SequenceFile with opion "r" or "w" * @return the corresponding fileID */ - virtual int sequenceFileOpen(const string& path, const string& option, + virtual int sequenceFileOpen(const string& path, const string& option, const string& keyType, const string& valueType) { - if (logging) - fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen - Path: %s\n",path.c_str()); - + if (logging) + fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen - Path: %s\n", + path.c_str()); + if ( (option.compare("r")==0) || (option.compare("w")==0)) { - - string values[] = {path, option, keyType, valueType}; - uplink->sendCMD(SEQFILE_OPEN,values, 4); - - while (!isNewResultInt) - protocol->nextEvent(); - isNewResultInt = false; - return resultInt; - } else { - fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen wrong option: %s!\n",option.c_str()); - return -1; //Error wrong option + string values[] = {path, option, keyType, valueType}; + uplink->sendCMD(SEQFILE_OPEN,values, 4); + + while (!isNewResultInt) + protocol->nextEvent(); + + isNewResultInt = false; + return resultInt; + } else { + fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen wrong option: %s!\n", + option.c_str()); + return -1; //Error wrong option } } - + /** * Read next key/value pair from the SequenceFile with fileID */ virtual bool sequenceFileReadNext(int fileID, string& _key, string& _value) { - + uplink->sendCMD(SEQFILE_READNEXT,fileID); - + while (!isNewKeyValuePair) protocol->nextEvent(); - + isNewKeyValuePair = false; - + _key = currentKey; - _value = currentValue; - - if (logging && _key.empty() && _value.empty()) + // check if value is array [0, 1, 2, ...], and remove brackets + int len = currentValue.length(); + if ( (currentValue[0]=='[') && + (currentValue[len-1]==']') ) { + _value = currentValue.substr(1,len-2); + } else { + _value = currentValue; + } + + if (logging && _key.empty() && _value.empty()) fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileReadNext - Empty KeyValuePair\n"); - + return (!_key.empty() && !_value.empty()); } - + /** * Append the next key/value pair to the SequenceFile with fileID */ virtual bool sequenceFileAppend(int fileID, const string& key, const string& value) { string values[] = {key, value}; uplink->sendCMD(SEQFILE_APPEND,fileID, values, 2); - + while (!isNewResultInt) protocol->nextEvent(); - + isNewResultInt = false; return (resultInt==1); } - + /** * Close SequenceFile */ virtual bool sequenceFileClose(int fileID) { uplink->sendCMD(SEQFILE_CLOSE,fileID); - + while (!isNewResultInt) protocol->nextEvent(); - - if (logging && resultInt==0) + + if (logging && resultInt==0) fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileClose - Nothing was closed!\n"); else if (logging) fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileClose - File was successfully closed!\n"); - + isNewResultInt = false; return (resultInt==1); } - + /********************************************/ - /*************** Other STUFF ***************/ + /*************** Other STUFF ***************/ /********************************************/ - + void setProtocol(Protocol* _protocol, UpwardProtocol* _uplink) { - protocol = _protocol; - uplink = _uplink; + protocol = _protocol; + uplink = _uplink; } - + bool isDone() { - pthread_mutex_lock(&mutexDone); - bool doneCopy = done; - pthread_mutex_unlock(&mutexDone); - return doneCopy; + pthread_mutex_lock(&mutexDone); + bool doneCopy = done; + pthread_mutex_unlock(&mutexDone); + return doneCopy; } - + /** * Advance to the next value. */ /* - bool nextValue() { - if (isNewKey || done) { - return false; - } - isNewValue = false; - //progress(); + bool nextValue() { + if (isNewKey || done) { + return false; + } + isNewValue = false; + //progress(); + protocol->nextEvent(); + return isNewValue; + } + */ + void waitForTask() { + while (!done && !hasTask) { + if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::waitForTask - done: %s hasTask: %s\n", + (done)?"true":"false",(hasTask)?"true":"false"); protocol->nextEvent(); - return isNewValue; - } - */ - void waitForTask() { - while (!done && !hasTask) { - if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::waitForTask - done: %s hasTask: %s\n", - (done)?"true":"false",(hasTask)?"true":"false"); - protocol->nextEvent(); - } + } } /* - bool nextKey() { - if (reader == NULL) { - while (!isNewKey) { - nextValue(); - if (done) { - return false; - } - } - key = *newKey; - } else { - if (!reader->next(key, const_cast(*value))) { - pthread_mutex_lock(&mutexDone); - done = true; - pthread_mutex_unlock(&mutexDone); - return false; - } - //progressFloat = reader->getProgress(); - } - isNewKey = false; - - if (bsp != NULL) { - bsp->bsp(*this); - } - return true; - } - */ + bool nextKey() { + if (reader == NULL) { + while (!isNewKey) { + nextValue(); + if (done) { + return false; + } + } + key = *newKey; + } else { + if (!reader->next(key, const_cast(*value))) { + pthread_mutex_lock(&mutexDone); + done = true; + pthread_mutex_unlock(&mutexDone); + return false; + } + //progressFloat = reader->getProgress(); + } + isNewKey = false; + + if (bsp != NULL) { + bsp->bsp(*this); + } + return true; + } + */ void closeAll() { if (reader) { reader->close(); @@ -1115,12 +1169,12 @@ if (bsp) { bsp->close(); } - + if (writer) { writer->close(); } } - + virtual ~BSPContextImpl() { delete job; delete inputKeyClass; @@ -1135,9 +1189,9 @@ pthread_mutex_destroy(&mutexDone); } }; - + /** - * Ping the parent every 5 seconds to know if it is alive + * Ping the parent every 5 seconds to know if it is alive */ void* ping(void* ptr) { BSPContextImpl* context = (BSPContextImpl*) ptr; @@ -1156,11 +1210,13 @@ addr.sin_family = AF_INET; addr.sin_port = htons(toInt(portStr)); addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - if(logging)fprintf(stderr,"HamaPipes::ping - connected to GroomServer Port: %s\n", portStr); + if(logging) + fprintf(stderr,"HamaPipes::ping - connected to GroomServer Port: %s\n", + portStr); HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0, string("problem connecting command socket: ") + strerror(errno)); - + } if (sock != -1) { int result = shutdown(sock, SHUT_RDWR); @@ -1171,8 +1227,8 @@ remaining_retries = MAX_RETRIES; } catch (Error& err) { if (!context->isDone()) { - fprintf(stderr, "Hama Pipes Exception: in ping %s\n", - err.getMessage().c_str()); + fprintf(stderr, "Hama Pipes Exception: in ping %s\n", + err.getMessage().c_str()); remaining_retries -= 1; if (remaining_retries == 0) { exit(1); @@ -1184,23 +1240,25 @@ } return NULL; } - + /** * Run the assigned task in the framework. - * The user's main function should set the various functions using the + * The user's main function should set the various functions using the * set* functions above and then call this. * @return true, if the task succeeded. */ bool runTask(const Factory& factory) { try { HADOOP_ASSERT(getenv("hama.pipes.logging")!=NULL,"No environment found!"); - - logging = (toInt(getenv("hama.pipes.logging"))==0)?false:true; - if(logging)fprintf(stderr,"HamaPipes::runTask - logging is: %s\n", (logging)?"true":"false"); - + + logging = (toInt(getenv("hama.pipes.logging"))==0)?false:true; + if(logging) + fprintf(stderr,"HamaPipes::runTask - logging is: %s\n", + ((logging)?"true":"false")); + BSPContextImpl* context = new BSPContextImpl(factory); Protocol* connection; - + char* portStr = getenv("hama.pipes.command.port"); int sock = -1; FILE* stream = NULL; @@ -1218,10 +1276,10 @@ HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0, string("problem connecting command socket: ") + strerror(errno)); - + stream = fdopen(sock, "r"); outStream = fdopen(sock, "w"); - + // increase buffer size int bufsize = 128*1024; int setbuf; @@ -1229,14 +1287,16 @@ bufout = new char[bufsize]; setbuf = setvbuf(stream, bufin, _IOFBF, bufsize); HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for inStream: ") - + strerror(errno)); + + strerror(errno)); setbuf = setvbuf(outStream, bufout, _IOFBF, bufsize); HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for outStream: ") - + strerror(errno)); - + + strerror(errno)); + connection = new BinaryProtocol(stream, context, outStream); - if(logging)fprintf(stderr,"HamaPipes::runTask - connected to GroomServer Port: %s\n", portStr); - + if(logging) + fprintf(stderr,"HamaPipes::runTask - connected to GroomServer Port: %s\n", + portStr); + } else if (getenv("hama.pipes.command.file")) { char* filename = getenv("hama.pipes.command.file"); string outFilename = filename; @@ -1249,18 +1309,18 @@ fprintf(stderr,"HamaPipes::runTask - Connection couldn't be initialized!\n"); return -1; } - + context->setProtocol(connection, connection->getUplink()); - + //pthread_t pingThread; //pthread_create(&pingThread, NULL, ping, (void*)(context)); context->waitForTask(); - + //while (!context->isDone()) { - //context->nextKey(); + //context->nextKey(); //} - + context->closeAll(); connection->getUplink()->sendCMD(DONE); @@ -1286,12 +1346,12 @@ } if (outStream != NULL) { //fclose(outStream); - } + } delete bufin; delete bufout; return true; } catch (Error& err) { - fprintf(stderr, "Hama Pipes Exception: %s\n", + fprintf(stderr, "Hama Pipes Exception: %s\n", err.getMessage().c_str()); return false; } Index: c++/src/main/native/utils/impl/Splitter.cc =================================================================== --- c++/src/main/native/utils/impl/Splitter.cc (revision 0) +++ c++/src/main/native/utils/impl/Splitter.cc (revision 0) @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "hadoop/Splitter.hh" + +#include +#include + +namespace HadoopUtils { + + + Splitter::Splitter ( const std::string& src, const std::string& delim ) { + reset ( src, delim ); + } + + std::string& Splitter::operator[] ( size_type i ) { + return _tokens.at ( i ); + } + + Splitter::size_type Splitter::size() { + return _tokens.size(); + } + + void Splitter::reset ( const std::string& src, const std::string& delim ) { + std::vector tokens; + std::string::size_type start = 0; + std::string::size_type end; + + for ( ; ; ) { + end = src.find ( delim, start ); + tokens.push_back ( src.substr ( start, end - start ) ); + + // We just copied the last token + if ( end == std::string::npos ) + break; + + // Exclude the delimiter in the next search + start = end + delim.size(); + } + + _tokens.swap ( tokens ); + } + +} Index: c++/src/CMakeLists.txt =================================================================== --- c++/src/CMakeLists.txt (revision 1526847) +++ c++/src/CMakeLists.txt (working copy) @@ -45,24 +45,25 @@ ) # Example programs -# add_executable(wordcount-simple main/native/examples/impl/wordcount-simple.cc) -# target_link_libraries(wordcount-simple hadooppipes hadooputils) -# output_directory(wordcount-simple examples) +add_executable(summation main/native/examples/impl/summation.cc) +target_link_libraries(summation hamapipes hadooputils) +output_directory(summation examples) -# add_executable(wordcount-part main/native/examples/impl/wordcount-part.cc) -# target_link_libraries(wordcount-part hadooppipes hadooputils) -# output_directory(wordcount-part examples) +add_executable(piestimator main/native/examples/impl/piestimator.cc) +target_link_libraries(piestimator hamapipes hadooputils) +output_directory(piestimator examples) -# add_executable(wordcount-nopipe main/native/examples/impl/wordcount-nopipe.cc) -# target_link_libraries(wordcount-nopipe hadooppipes hadooputils) -# output_directory(wordcount-nopipe examples) +add_executable(matrixmultiplication main/native/examples/impl/matrixmultiplication.cc) +target_link_libraries(matrixmultiplication DenseDoubleVector hamapipes hadooputils) +output_directory(matrixmultiplication examples) -# add_executable(pipes-sort main/native/examples/impl/sort.cc) -# target_link_libraries(pipes-sort hadooppipes hadooputils) -# output_directory(pipes-sort examples) +add_library(DenseDoubleVector + main/native/examples/impl/DenseDoubleVector.cc +) add_library(hadooputils STATIC main/native/utils/impl/StringUtils.cc + main/native/utils/impl/Splitter.cc main/native/utils/impl/SerialUtils.cc )