Index: c++/src/main/native/examples/impl/piestimator.cc =================================================================== --- c++/src/main/native/examples/impl/piestimator.cc (revision 1556897) +++ c++/src/main/native/examples/impl/piestimator.cc (working copy) @@ -18,7 +18,6 @@ #include "hama/Pipes.hh" #include "hama/TemplateFactory.hh" -#include "hadoop/StringUtils.hh" #include #include @@ -27,32 +26,30 @@ #include using std::string; -using std::cout; using HamaPipes::BSP; using HamaPipes::BSPContext; -using namespace HadoopUtils; -class PiEstimatorBSP: public BSP { - private: +class PiEstimatorBSP: public BSP { +private: string master_task_; long iterations_; // iterations_per_bsp_task - public: - PiEstimatorBSP(BSPContext& context) { - iterations_ = 1000000L; +public: + PiEstimatorBSP(BSPContext& context) { + iterations_ = 10000000L; } inline double closed_interval_rand(double x0, double x1) { return x0 + (x1 - x0) * rand() / ((double) RAND_MAX); } - void setup(BSPContext& context) { + void setup(BSPContext& context) { // Choose one as a master master_task_ = context.getPeerName(context.getNumPeers() / 2); } - void bsp(BSPContext& context) { + void bsp(BSPContext& context) { /* initialize random seed */ srand(time(NULL)); @@ -70,7 +67,7 @@ context.sync(); } - void cleanup(BSPContext& context) { + void cleanup(BSPContext& context) { if (context.getPeerName().compare(master_task_)==0) { long total_hits = 0; @@ -80,12 +77,12 @@ } double pi = 4.0 * total_hits / (msg_count * iterations_); - context.write("Estimated value of PI", pi); + context.write(pi); } } }; int main(int argc, char *argv[]) { - return HamaPipes::runTask(HamaPipes::TemplateFactory()); + return HamaPipes::runTask(HamaPipes::TemplateFactory()); } Index: c++/src/main/native/examples/impl/matrixmultiplication.cc =================================================================== --- c++/src/main/native/examples/impl/matrixmultiplication.cc (revision 1556897) +++ c++/src/main/native/examples/impl/matrixmultiplication.cc (working copy) @@ -130,7 +130,7 @@ }; -class MatrixRowPartitioner: public Partitioner { +class MatrixRowPartitioner: public Partitioner { public: MatrixRowPartitioner(BSPContext& context) { } Index: c++/src/main/native/examples/impl/summation.cc =================================================================== --- c++/src/main/native/examples/impl/summation.cc (revision 1556897) +++ c++/src/main/native/examples/impl/summation.cc (working copy) @@ -29,25 +29,27 @@ using HamaPipes::BSP; using HamaPipes::BSPContext; -class SummationBSP: public BSP { - private: +class SummationBSP: public BSP { +private: string master_task_; - public: - SummationBSP(BSPContext& context) { } +public: + SummationBSP(BSPContext& context) { } - void setup(BSPContext& context) { + void setup(BSPContext& context) { // Choose one as a master master_task_ = context.getPeerName(context.getNumPeers() / 2); } - void bsp(BSPContext& context) { + void bsp(BSPContext& context) { double intermediate_sum = 0.0; string key; string value; while(context.readNext(key,value)) { + // We are using the KeyValueTextInputFormat, + // therefore we have to convert string value to double intermediate_sum += HadoopUtils::toDouble(value); } @@ -55,7 +57,7 @@ context.sync(); } - void cleanup(BSPContext& context) { + void cleanup(BSPContext& context) { if (context.getPeerName().compare(master_task_)==0) { double sum = 0.0; @@ -63,12 +65,12 @@ for (int i=0; i < msg_count; i++) { sum += context.getCurrentMessage(); } - context.write("Sum", sum); + context.write(sum); } } }; int main(int argc, char *argv[]) { - return HamaPipes::runTask(HamaPipes::TemplateFactory()); + return HamaPipes::runTask(HamaPipes::TemplateFactory()); } Index: c++/src/main/native/examples/conf/summation.xml =================================================================== --- c++/src/main/native/examples/conf/summation.xml (revision 1556897) +++ c++/src/main/native/examples/conf/summation.xml (working copy) @@ -22,14 +22,6 @@ hdfs:/examples/bin/summation - hama.pipes.java.recordreader - true - - - hama.pipes.java.recordwriter - true - - bsp.input.format.class org.apache.hama.bsp.KeyValueTextInputFormat @@ -47,7 +39,7 @@ bsp.output.key.class - org.apache.hadoop.io.Text + org.apache.hadoop.io.NullWritable bsp.output.value.class Index: c++/src/main/native/examples/conf/piestimator.xml =================================================================== --- c++/src/main/native/examples/conf/piestimator.xml (revision 1556897) +++ c++/src/main/native/examples/conf/piestimator.xml (working copy) @@ -22,14 +22,6 @@ hdfs:/examples/bin/piestimator - hama.pipes.java.recordreader - true - - - hama.pipes.java.recordwriter - true - - bsp.input.format.class org.apache.hama.bsp.NullInputFormat @@ -39,7 +31,7 @@ bsp.output.key.class - org.apache.hadoop.io.Text + org.apache.hadoop.io.NullWritable bsp.output.value.class Index: c++/src/main/native/examples/conf/matrixmultiplication.xml =================================================================== --- c++/src/main/native/examples/conf/matrixmultiplication.xml (revision 1556897) +++ c++/src/main/native/examples/conf/matrixmultiplication.xml (working copy) @@ -22,14 +22,6 @@ hdfs:/examples/bin/matrixmultiplication - hama.pipes.java.recordreader - true - - - hama.pipes.java.recordwriter - true - - bsp.input.dir /examples/input/matrixmultiplication/MatrixA.seq Index: c++/src/main/native/pipes/impl/Pipes.cc =================================================================== --- c++/src/main/native/pipes/impl/Pipes.cc (revision 1556897) +++ c++/src/main/native/pipes/impl/Pipes.cc (working copy) @@ -21,13 +21,17 @@ /********************************************/ class BinaryUpwardProtocol: public UpwardProtocol { private: - FileOutStream* out_stream_; + HadoopUtils::FileOutStream* out_stream_; public: BinaryUpwardProtocol(FILE* out_stream) { - out_stream_ = new FileOutStream(); + out_stream_ = new HadoopUtils::FileOutStream(); HADOOP_ASSERT(out_stream_->open(out_stream), "problem opening stream"); } + virtual void sendCommand(int32_t cmd) { + sendCommand(cmd, true); + } + /* local sendCommand function */ void sendCommand(int32_t cmd, bool flush) { serialize(cmd, *out_stream_); @@ -40,7 +44,7 @@ } } - template + template void sendCommand(int32_t cmd, T value) { sendCommand(cmd, false); // Write out generic value @@ -52,7 +56,7 @@ } } - template + template void sendCommand(int32_t cmd, const T values[], int size) { sendCommand(cmd, false); // Write out generic values @@ -66,7 +70,7 @@ out_stream_->flush(); } - template + template void sendCommand(int32_t cmd, T1 value1, T2 value2) { sendCommand(cmd, false); // Write out generic value1 @@ -84,7 +88,31 @@ out_stream_->flush(); } - template + template + void sendCommand(int32_t cmd, T1 value1, T2 value2, T3 value3) { + sendCommand(cmd, false); + // Write out generic value1 + serialize(value1, *out_stream_); + if(logging) { + fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param1: '%s'\n", + messageTypeNames[cmd], toString(value1).c_str()); + } + // Write out generic value2 + serialize(value2, *out_stream_); + if(logging) { + fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param2: '%s'\n", + messageTypeNames[cmd], toString(value2).c_str()); + } + // Write out generic value3 + serialize(value3, *out_stream_); + if(logging) { + fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param3: '%s'\n", + messageTypeNames[cmd], toString(value3).c_str()); + } + out_stream_->flush(); + } + + template void sendCommand(int32_t cmd, T1 value, const T2 values[], int size) { sendCommand(cmd, false); // Write out generic value @@ -104,27 +132,6 @@ out_stream_->flush(); } - virtual void sendCommand(int32_t cmd) { - sendCommand(cmd, true); - } - - /* - virtual void registerCounter(int id, const string& group, - const string& name) { - serialize(REGISTER_COUNTER, *stream); - serialize(id, *stream); - serialize(group, *stream); - serialize(name, *stream); - } - - virtual void incrementCounter(const TaskContext::Counter* counter, - uint64_t amount) { - serialize(INCREMENT_COUNTER, *stream); - serialize(counter->getId(), *stream); - serialize(amount, *stream); - } - */ - virtual void incrementCounter(const string& group, const string& name, uint64_t amount) { serialize(INCREMENT_COUNTER, *out_stream_); serialize(group, *out_stream_); @@ -141,19 +148,97 @@ } }; +#define NEXT_EVENT_COMMANDS \ + case START_MESSAGE: {\ + int32_t protocol_version = deserialize(*in_stream_);\ + if(logging) {\ + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got START_MESSAGE protocol_version: %d\n",\ + protocol_version);\ + }\ + handler_->start(protocol_version);\ + break;\ + }\ + case SET_BSPJOB_CONF: {\ + int32_t entries = deserialize(*in_stream_);\ + if(logging) {\ + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF entries: %d\n",\ + entries);\ + }\ + vector properties(entries*2);\ + for(int i=0; i < entries*2; ++i) {\ + string item = deserialize(*in_stream_);\ + properties.push_back(item);\ + if(logging) {\ + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF add NewEntry: %s\n",\ + item.c_str());\ + }\ + }\ + if(logging) {\ + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got all Configuration %d entries!\n",\ + entries);\ + }\ + handler_->setBSPJob(properties);\ + break;\ + }\ + case SET_INPUT_TYPES: {\ + string key_type = deserialize(*in_stream_);\ + string value_type = deserialize(*in_stream_);\ + if(logging) {\ + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_INPUT_TYPES keyType: %s valueType: %s\n",\ + key_type.c_str(), value_type.c_str());\ + }\ + handler_->setInputTypes(key_type, value_type);\ + break;\ + }\ + case RUN_SETUP: {\ + if(logging) {\ + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_SETUP\n");\ + }\ + handler_->runSetup();\ + break;\ + }\ + case RUN_BSP: {\ + if(logging) {\ + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_BSP\n");\ + }\ + handler_->runBsp();\ + break;\ + }\ + case RUN_CLEANUP: {\ + if(logging) {\ + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_CLEANUP\n");\ + }\ + handler_->runCleanup();\ + break;\ + }\ + case CLOSE: {\ + if(logging) {\ + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got CLOSE\n");\ + }\ + handler_->close();\ + break;\ + }\ + case ABORT: {\ + if(logging) {\ + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got ABORT\n");\ + }\ + handler_->abort();\ + break;\ + } + /********************************************/ /************** BinaryProtocol **************/ /********************************************/ -template -class BinaryProtocol: public Protocol< BinaryProtocol > { +template +class BinaryProtocol: public Protocol< BinaryProtocol, K1, V1 > { private: - FileInStream* in_stream_; - DownwardProtocol* handler_; + HadoopUtils::FileInStream* in_stream_; + DownwardProtocol* handler_; BinaryUpwardProtocol* uplink_; public: - BinaryProtocol(FILE* in_stream, DownwardProtocol* handler, FILE* uplink) { - in_stream_ = new FileInStream(); + BinaryProtocol(FILE* in_stream, DownwardProtocol* handler, FILE* uplink) { + in_stream_ = new HadoopUtils::FileInStream(); in_stream_->open(in_stream); uplink_ = new BinaryUpwardProtocol(uplink); handler_ = handler; @@ -164,130 +249,85 @@ } /** - * Wait for next event, but don't expect a response for - * a previously sent command + * Wait for next event and handle it */ void nextEvent() { // read command - int32_t cmd; - cmd = deserializeInt(*in_stream_); + int32_t cmd = deserialize(*in_stream_); switch (cmd) { - case START_MESSAGE: { - int32_t protocol_version; - protocol_version = deserialize(*in_stream_); - if(logging) { - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got START_MESSAGE protocol_version: %d\n", - protocol_version); - } - handler_->start(protocol_version); - break; + NEXT_EVENT_COMMANDS + + default: { + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - Unknown binary command: %d\n", + cmd); + HADOOP_ASSERT(false, "HamaPipes::BinaryProtocol::nextEvent: Unknown binary command " + toString(cmd)); } - // setup BSP Job Configuration - case SET_BSPJOB_CONF: { - int32_t entries; - entries = deserialize(*in_stream_); + } + } + + /** + * Wait for next event and handle it + */ + template + void nextEvent() { + // read command + int32_t cmd = deserialize(*in_stream_); + + switch (cmd) { + + NEXT_EVENT_COMMANDS + + case PARTITION_REQUEST: { if(logging) { - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF entries: %d\n", - entries); + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got PARTITION_REQUEST\n"); } - vector properties(entries*2); - for(int i=0; i < entries*2; ++i) { - string item; - item = deserialize(*in_stream_); - properties.push_back(item); - if(logging) { - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF add NewEntry: %s\n", - item.c_str()); - } - } - if(logging) { - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got all Configuration %d entries!\n", - entries); - } - handler_->setBSPJob(properties); + + K partion_key = deserialize(*in_stream_); + V partion_value = deserialize(*in_stream_); + int32_t num_tasks = deserialize(*in_stream_); + + handler_->runPartition(partion_key, partion_value, num_tasks); + break; } - case SET_INPUT_TYPES: { - string key_type; - string value_type; - key_type = deserialize(*in_stream_); - value_type = deserialize(*in_stream_); - if(logging) { - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_INPUT_TYPES keyType: %s valueType: %s\n", - key_type.c_str(), value_type.c_str()); - } - handler_->setInputTypes(key_type, value_type); - break; + default: { + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - Unknown binary command: %d\n", + cmd); + HADOOP_ASSERT(false, "HamaPipes::BinaryProtocol::nextEvent: Unknown binary command " + toString(cmd)); } - case RUN_SETUP: { - if(logging) { - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_SETUP\n"); - } - int32_t piped_input; - int32_t piped_output; - piped_input = deserialize(*in_stream_); - piped_output = deserialize(*in_stream_); - handler_->runSetup(piped_input, piped_output); - break; - } - case RUN_BSP: { - if(logging) { - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_BSP\n"); - } - int32_t piped_input; - int32_t piped_output; - piped_input = deserialize(*in_stream_); - piped_output = deserialize(*in_stream_); - handler_->runBsp(piped_input, piped_output); - break; - } - case RUN_CLEANUP: { - if(logging) { - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_CLEANUP\n"); - } - int32_t piped_input; - int32_t piped_output; - piped_input = deserialize(*in_stream_); - piped_output = deserialize(*in_stream_); - handler_->runCleanup(piped_input, piped_output); - break; - } + } + } + + /** + * Wait for next event and handle it + */ + template + void nextEvent() { + // read command + int32_t cmd = deserialize(*in_stream_); + + switch (cmd) { + + NEXT_EVENT_COMMANDS + case PARTITION_REQUEST: { if(logging) { fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got PARTITION_REQUEST\n"); } - K1 partion_key; - V1 partion_value; - int32_t num_tasks; + T partion_key_or_value = deserialize(*in_stream_); + int32_t num_tasks = deserialize(*in_stream_); - partion_key = deserialize(*in_stream_); - partion_value = deserialize(*in_stream_); - num_tasks = deserialize(*in_stream_); + handler_->runPartition(partion_key_or_value, num_tasks); - handler_->runPartition(partion_key, partion_value, num_tasks); break; } - case CLOSE: { - if(logging) { - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got CLOSE\n"); - } - handler_->close(); - break; - } - case ABORT: { - if(logging) { - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got ABORT\n"); - } - handler_->abort(); - break; - } default: { - HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd)); fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - Unknown binary command: %d\n", cmd); + HADOOP_ASSERT(false, "HamaPipes::BinaryProtocol::nextEvent: Unknown binary command " + toString(cmd)); } } } @@ -306,11 +346,11 @@ /** * Wait for next event, which should be a response for * a previously sent command (expected_response_cmd) - * and return the generic result + * + * Returns a generic result */ - template + template T getResult(int32_t expected_response_cmd) { - T result = T(); // read response command @@ -320,102 +360,50 @@ if (expected_response_cmd == cmd) { switch (cmd) { - - case GET_MSG_COUNT: { - T msg_count; - msg_count = deserialize(*in_stream_); - if(logging) { - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG_COUNT msg_count: '%s'\n", - toString(msg_count).c_str()); - } - return msg_count; - } - case GET_MSG: { - T msg; - msg = deserialize(*in_stream_); - if(logging) { - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG msg: '%s'\n", - toString(msg).c_str()); - } - return msg; - } - case GET_PEERNAME: { - T peername; - peername = deserialize(*in_stream_); - if(logging) { - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEERNAME peername: %s\n", - toString(peername).c_str()); - } - return peername; - } - case GET_PEER_INDEX: { - T peer_index = deserialize(*in_stream_); - if(logging) { - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_INDEX peer_index: '%s'\n", - toString(peer_index).c_str()); - } - return peer_index; - } - case GET_PEER_COUNT: { - T peer_count = deserialize(*in_stream_); - if(logging) { - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_COUNT peer_count: '%s'\n", - toString(peer_count).c_str()); - } - return peer_count; - } - case GET_SUPERSTEP_COUNT: { - T superstep_count = deserialize(*in_stream_); - if(logging) { - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_SUPERSTEP_COUNT superstep_count: '%s'\n", - toString(superstep_count).c_str()); - } - return superstep_count; - } - - case SEQFILE_OPEN: { - T file_id = deserialize(*in_stream_); - if(logging) { - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_OPEN file_id: '%s'\n", - toString(file_id).c_str()); - } - return file_id; - } - case SEQFILE_APPEND: { + case GET_MSG_COUNT: + case GET_MSG: + case GET_PEERNAME: + case GET_PEER_INDEX: + case GET_PEER_COUNT: + case GET_SUPERSTEP_COUNT: + case SEQFILE_OPEN: + case SEQFILE_APPEND: + case SEQFILE_CLOSE: { result = deserialize(*in_stream_); if(logging) { - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_APPEND result: '%s'\n", + fprintf(stderr,"HamaPipes::BinaryProtocol::getResult - got '%s' result: '%s'\n", + messageTypeNames[cmd], toString(result).c_str()); } return result; } - case SEQFILE_CLOSE: { - result = deserialize(*in_stream_); + } + } else { // Not expected response + + switch (cmd) { + case END_OF_DATA: { if(logging) { - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_CLOSE result: '%s'\n", - toString(result).c_str()); + fprintf(stderr,"HamaPipes::BinaryProtocol::getResult - got END_OF_DATA\n"); } return result; } } - // Not expected response - } else { - + // TODO /* case CLOSE: { - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got CLOSE\n"); + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::getResult - got CLOSE\n"); handler_->close(); break; } case ABORT: { - if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got ABORT\n"); + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::getResult - got ABORT\n"); handler_->abort(); break; } */ - HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd)); - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent(%d) - Unknown binary command: %d\n", + fprintf(stderr,"HamaPipes::BinaryProtocol::getResult(expected_response_cmd=%d) - Unknown binary command: %d\n", expected_response_cmd, cmd); + HADOOP_ASSERT(false, "HamaPipes::BinaryProtocol::getResult: Unknown binary command " + toString(cmd)); } return result; } @@ -423,11 +411,11 @@ /** * Wait for next event, which should be a response for * a previously sent command (expected_response_cmd) - * and return the generic vector result list + * + * Returns the generic vector result list */ - template + template vector getVectorResult(int32_t expected_response_cmd) { - vector results; // read response command @@ -442,14 +430,14 @@ T peername; int32_t peername_count = deserialize(*in_stream_); if(logging) { - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peername_count: %d\n", + fprintf(stderr,"HamaPipes::BinaryProtocol::getVectorResult - got GET_ALL_PEERNAME peername_count: %d\n", peername_count); } for (int i=0; i(*in_stream_); peernames.push_back(peername); if(logging) { - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peername: '%s'\n", + fprintf(stderr,"HamaPipes::BinaryProtocol::getVectorResult - got GET_ALL_PEERNAME peername: '%s'\n", toString(peername).c_str()); } } @@ -457,9 +445,9 @@ } } } else { - HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd)); - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent(%d) - Unknown binary command: %d\n", + fprintf(stderr,"HamaPipes::BinaryProtocol::getVectorResult(%d) - Unknown binary command: %d\n", expected_response_cmd, cmd); + HADOOP_ASSERT(false, "HamaPipes::BinaryProtocol::getVectorResult: Unknown binary command " + toString(cmd)); } return results; } @@ -467,12 +455,12 @@ /** * Wait for next event, which should be a response for * a previously sent command (expected_response_cmd) - * and return the generic KeyValuePair or an empty one + * + * Returns the generic KeyValuePair or an empty one * if no data is available */ - template + template KeyValuePair getKeyValueResult(int32_t expected_response_cmd) { - KeyValuePair key_value_pair; // read response command @@ -483,14 +471,16 @@ switch (cmd) { - case READ_KEYVALUE: { + case READ_KEYVALUE: + case SEQFILE_READNEXT: { K key = deserialize(*in_stream_); V value = deserialize(*in_stream_); if(logging) { string k = toString(key); string v = toString(value); - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got READ_KEYVALUE key: '%s' value: '%s'\n", + fprintf(stderr,"HamaPipes::BinaryProtocol::getKeyValueResult - got '%s' key: '%s' value: '%s'\n", + messageTypeNames[cmd], ((k.length()<10)?k.c_str():k.substr(0,9).append("...").c_str()), ((v.length()<10)?v.c_str():v.substr(0,9).append("...").c_str()) ); } @@ -498,34 +488,70 @@ key_value_pair = pair(key, value); return key_value_pair; } + case END_OF_DATA: { + key_value_pair = KeyValuePair(true); + if(logging) { + fprintf(stderr,"HamaPipes::BinaryProtocol::getKeyValueResult - got END_OF_DATA\n"); + } + } + + } + } else { + key_value_pair = KeyValuePair(true); + fprintf(stderr,"HamaPipes::BinaryProtocol::getKeyValueResult(expected_cmd = %d) - Unknown binary command: %d\n", + expected_response_cmd, cmd); + fprintf(stderr,"Error: Please verfiy serialization! The key or value type could possibly not be deserialized!\n"); + HADOOP_ASSERT(false, "HamaPipes::BinaryProtocol::getKeyValueResult: Unknown binary command " + toString(cmd)); + } + return key_value_pair; + } + + /** + * Wait for next event, which should be a response for + * a previously sent command (expected_response_cmd) + * + * Returns the KeyValuePair with one value only + * or an empty one if no data is available + */ + template + KeyValuePair getKeyValueResult(int32_t expected_response_cmd) { + KeyValuePair key_value_pair; + + // read response command + int32_t cmd = deserialize(*in_stream_); + + // check if response is expected or END_OF_DATA + if ((expected_response_cmd == cmd) || (cmd == END_OF_DATA) ) { + + switch (cmd) { + + case READ_KEYVALUE: case SEQFILE_READNEXT: { - K key = deserialize(*in_stream_); - V value = deserialize(*in_stream_); + T value = deserialize(*in_stream_); if(logging) { - string k = toString(key); - string v = toString(value); - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_READNEXT key: '%s' value: '%s'\n", - ((k.length()<10)?k.c_str():k.substr(0,9).append("...").c_str()), + string v = toString(value); + fprintf(stderr,"HamaPipes::BinaryProtocol::getKeyValueResult - got '%s' value: '%s'\n", + messageTypeNames[cmd], ((v.length()<10)?v.c_str():v.substr(0,9).append("...").c_str()) ); } - key_value_pair = pair(key, value); + key_value_pair = pair(value, value); return key_value_pair; } case END_OF_DATA: { - key_value_pair = KeyValuePair(true); + key_value_pair = KeyValuePair(true); if(logging) { - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got END_OF_DATA\n"); + fprintf(stderr,"HamaPipes::BinaryProtocol::getKeyValueResult - got END_OF_DATA\n"); } } } } else { - key_value_pair = KeyValuePair(true); - fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent(expected_cmd = %d) - Unknown binary command: %d\n", + key_value_pair = KeyValuePair(true); + fprintf(stderr,"HamaPipes::BinaryProtocol::getKeyValueResult(expected_cmd = %d) - Unknown binary command: %d\n", expected_response_cmd, cmd); - fprintf(stderr,"ERORR: Please verfiy serialization! The key or value type could possibly not be deserialized!\n"); - HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd)); + fprintf(stderr,"Error: Please verfiy serialization! The key or value type could possibly not be deserialized!\n"); + HADOOP_ASSERT(false, "HamaPipes::BinaryProtocol::getKeyValueResult: Unknown binary command " + toString(cmd)); } return key_value_pair; } @@ -540,16 +566,14 @@ /********************************************/ /************** BSPContextImpl **************/ /********************************************/ -template -class BSPContextImpl: public BSPContext, public DownwardProtocol { +template +class BSPContextImpl: public BSPContext, public DownwardProtocol, K1, V1> { private: const Factory* factory_; BSPJob* job_; BSP* bsp_; - Partitioner* partitioner_; - RecordReader* reader_; - RecordWriter* writer_; - Protocol< BinaryProtocol >* protocol_; + Partitioner* partitioner_; + Protocol< BinaryProtocol, K1, V1>, K1, V1 >* protocol_; UpwardProtocol* uplink_; bool done_; @@ -563,13 +587,10 @@ public: BSPContextImpl(const Factory& factory) { - factory_ = &factory; job_ = NULL; bsp_ = NULL; partitioner_ = NULL; - reader_ = NULL; - writer_ = NULL; protocol_ = NULL; uplink_ = NULL; @@ -586,8 +607,8 @@ /********************************************/ virtual void start(int protocol_version) { if (protocol_version != 0) { - throw Error("Protocol version " + toString(protocol_version) + - " not supported"); + throw HadoopUtils::Error("HamaPipes::BSPContextImpl::start Protocol version " + toString(protocol_version) + + " not supported"); } partitioner_ = factory_->createPartitioner(*this); } @@ -595,7 +616,7 @@ virtual void setBSPJob(vector values) { int len = values.size(); BSPJobImpl* result = new BSPJobImpl(); - HADOOP_ASSERT(len % 2 == 0, "Odd length of job conf values"); + HADOOP_ASSERT(len % 2 == 0, "HamaPipes::BSPContextImpl::setBSPJob Odd length of job conf values"); for(int i=0; i < len; i += 2) { result->set(values[i], values[i+1]); } @@ -606,36 +627,7 @@ inputClass_ = pair(key_type, value_type); } - /* local method */ - void setupReaderWriter(bool piped_input, bool piped_output) { - - if(logging) { - fprintf(stderr,"HamaPipes::BSPContextImpl::setupReaderWriter - pipedInput: %s pipedOutput: %s\n", - (piped_input)?"true":"false", (piped_output)?"true":"false"); - } - - if (piped_input && reader_==NULL) { - reader_ = factory_->createRecordReader(*this); - HADOOP_ASSERT((reader_ == NULL) == piped_input, - piped_input ? "RecordReader defined when not needed.": - "RecordReader not defined"); - - //if (reader != NULL) { - // value = new string(); - //} - } - - if (piped_output && writer_==NULL) { - writer_ = factory_->createRecordWriter(*this); - HADOOP_ASSERT((writer_ == NULL) == piped_output, - piped_output ? "RecordWriter defined when not needed.": - "RecordWriter not defined"); - } - } - - virtual void runSetup(bool piped_input, bool piped_output) { - setupReaderWriter(piped_input, piped_output); - + virtual void runSetup() { if (bsp_ == NULL) { bsp_ = factory_->createBSP(*this); } @@ -648,9 +640,7 @@ } } - virtual void runBsp(bool piped_input, bool piped_output) { - setupReaderWriter(piped_input, piped_output); - + virtual void runBsp() { if (bsp_ == NULL) { bsp_ = factory_->createBSP(*this); } @@ -663,9 +653,7 @@ } } - virtual void runCleanup(bool piped_input, bool piped_output) { - setupReaderWriter(piped_input, piped_output); - + virtual void runCleanup() { if (bsp_ != NULL) { has_task_ = true; bsp_->cleanup(*this); @@ -674,20 +662,6 @@ } } - /********************************************/ - /******* Partitioner *******/ - /********************************************/ - virtual void runPartition(const K1& key, const V1& value, int32_t num_tasks) { - if (partitioner_ != NULL) { - int part = partitioner_->partition(key, value, num_tasks); - uplink_->sendCommand(PARTITION_RESPONSE, part); - } else { - if(logging) { - fprintf(stderr,"HamaPipes::BSPContextImpl::runPartition Partitioner is NULL!\n"); - } - } - } - virtual void close() { pthread_mutex_lock(&mutex_done_); done_ = true; @@ -701,10 +675,37 @@ } virtual void abort() { - throw Error("Aborted by driver"); + throw HadoopUtils::Error("HamaPipes::BSPContextImpl::abort Aborted by driver"); } /********************************************/ + /***** DownwardProtocolPartition IMPL *******/ + /********************************************/ + template + void runPartition(const K& key, const V& value, int32_t num_tasks) { + if (partitioner_ != NULL) { + int part = partitioner_->partition(key, value, num_tasks); + uplink_->sendCommand(PARTITION_RESPONSE, part); + } else { + if(logging) { + fprintf(stderr,"HamaPipes::BSPContextImpl::runPartition Partitioner is NULL!\n"); + } + } + } + + template + void runPartition(const T& key_or_value, int32_t num_tasks) { + if (partitioner_ != NULL) { + int part = partitioner_->partition(key_or_value, num_tasks); + uplink_->sendCommand(PARTITION_RESPONSE, part); + } else { + if(logging) { + fprintf(stderr,"HamaPipes::BSPContextImpl::runPartition Partitioner is NULL!\n"); + } + } + } + + /********************************************/ /************** TaskContext IMPL ************/ /********************************************/ @@ -716,32 +717,16 @@ } /** - * Get the current key. - * @return the current key or NULL if called before the first map or reduce - */ - //virtual const string& getInputKey() { - // return key; - //} - - /** - * Get the current value. - * @return the current value or NULL if called before the first map or - * reduce - */ - //virtual const string& getInputValue() { - // return *value; - //} - - /** * Register a counter with the given group and name. */ virtual long getCounter(const string& group, const string& name) { - // TODO - // int id = registeredCounterIds.size(); - // registeredCounterIds.push_back(id); - // uplink->registerCounter(id, group, name); - // return new Counter(id); - return 0; + // TODO + + // int id = registeredCounterIds.size(); + // registeredCounterIds.push_back(id); + // uplink->registerCounter(id, group, name); + // return new Counter(id); + return 0; } /** @@ -754,7 +739,7 @@ // Verify response command bool response = protocol_->verifyResult(INCREMENT_COUNTER); if (response == false) { - throw Error("incrementCounter received wrong response!"); + throw HadoopUtils::Error("HamaPipes::BSPContextImpl::incrementCounter received wrong response!"); } } @@ -784,38 +769,8 @@ } /** - * Send a data with a tag to another BSPSlave corresponding to hostname. - * Messages sent by this method are not guaranteed to be received in a sent - * order. + * Returns the number of messages in the peer's received messages queue. */ - virtual void sendMessage(const string& peer_name, const M& msg) { - uplink_->sendCommand(SEND_MSG, peer_name, msg); - - // Verify response command - bool response = protocol_->verifyResult(SEND_MSG); - if (response == false) { - throw Error("sendMessage received wrong response!"); - } - } - - /** - * @return A message from the peer's received messages queue (a FIFO). - */ - virtual M getCurrentMessage() { - uplink_->sendCommand(GET_MSG); - - M message = protocol_->template getResult(GET_MSG); - - if(logging) { - fprintf(stderr,"HamaPipes::BSPContextImpl::getMessage - result: %s\n", - toString(message).c_str()); - } - return message; - } - - /** - * @return The number of messages in the peer's received messages queue. - */ virtual int getNumCurrentMessages() { uplink_->sendCommand(GET_MSG_COUNT); @@ -840,12 +795,12 @@ // Verify response command bool response = protocol_->verifyResult(SYNC); if (response == false) { - throw Error("sync received wrong response!"); + throw HadoopUtils::Error("HamaPipes::BSPContextImpl::sync received wrong response!"); } } /** - * @return the name of this peer in the format "hostname:port". + * Returns the name of this peer in the format "hostname:port". */ virtual string getPeerName() { // submit id=-1 to receive own peername @@ -861,7 +816,7 @@ } /** - * @return the name of n-th peer from sorted array by name. + * Returns the name of n-th peer from sorted array by name. */ virtual string getPeerName(int index) { uplink_->sendCommand(GET_PEERNAME, index); @@ -872,24 +827,21 @@ fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - result: %s\n", result.c_str()); } - return result; } /** - * @return the names of all the peers executing tasks from the same job + * Returns the names of all the peers executing tasks from the same job * (including this peer). */ virtual vector getAllPeerNames() { uplink_->sendCommand(GET_ALL_PEERNAME); - vector results = protocol_->template getVectorResult(GET_ALL_PEERNAME); - - return results; + return protocol_->template getVectorResult(GET_ALL_PEERNAME); } /** - * @return the index of this peer from sorted array by name. + * Returns the index of this peer from sorted array by name. */ virtual int getPeerIndex() { uplink_->sendCommand(GET_PEER_INDEX); @@ -904,7 +856,7 @@ } /** - * @return the number of peers + * Returns the number of peers */ virtual int getNumPeers() { uplink_->sendCommand(GET_PEER_COUNT); @@ -919,7 +871,7 @@ } /** - * @return the count of current super-step + * Returns the count of current super-step */ virtual long getSuperstepCount() { uplink_->sendCommand(GET_SUPERSTEP_COUNT); @@ -942,36 +894,35 @@ // Verify response command bool response = protocol_->verifyResult(CLEAR); if (response == false) { - throw Error("clear received wrong response!"); + throw HadoopUtils::Error("HamaPipes::BSPContextImpl::clear received wrong response!"); } } /** - * Writes a key/value pair to the output collector + * Closes the input and opens it right away, so that the file pointer is at + * the beginning again. */ - virtual void write(const K2& key, const V2& value) { - if (writer_ != NULL) { - writer_->emit(key, value); // TODO writer not implemented - } else { - uplink_->sendCommand(WRITE_KEYVALUE, key, value); - } - + virtual void reopenInput() { + uplink_->sendCommand(REOPEN_INPUT); + // Verify response command - bool response = protocol_->verifyResult(WRITE_KEYVALUE); + bool response = protocol_->verifyResult(REOPEN_INPUT); if (response == false) { - throw Error("write received wrong response!"); + throw HadoopUtils::Error("HamaPipes::BSPContextImpl::reopenInput received wrong response!"); } } + /********************************************/ + /******* Reader IMPL *******/ + /********************************************/ /** - * Deserializes the next input key value into the given objects; + * Deserializes the next input key value into the given objects */ - virtual bool readNext(K1& key, V1& value) { - + template + bool readNext(K& key, V& value) { uplink_->sendCommand(READ_KEYVALUE); - KeyValuePair key_value_pair; - key_value_pair = protocol_->template getKeyValueResult(READ_KEYVALUE); + KeyValuePair key_value_pair = protocol_->template getKeyValueResult(READ_KEYVALUE); if (!key_value_pair.is_empty) { key = key_value_pair.first; @@ -986,27 +937,99 @@ } /** - * Closes the input and opens it right away, so that the file pointer is at - * the beginning again. + * Deserializes the next input key OR value into the given object */ - virtual void reopenInput() { - uplink_->sendCommand(REOPEN_INPUT); + template + bool readNext(T& key_or_value) { + uplink_->sendCommand(READ_KEYVALUE); + KeyValuePair key_value_pair = protocol_->template getKeyValueResult(READ_KEYVALUE); + + if (!key_value_pair.is_empty) { + key_or_value = key_value_pair.first; + } + + if (logging && key_value_pair.is_empty) { + fprintf(stderr,"HamaPipes::BSPContextImpl::readNext - END_OF_DATA\n"); + } + + return (!key_value_pair.is_empty); + } + + /********************************************/ + /******* Writer IMPL *******/ + /********************************************/ + /** + * Writes a key/value pair to the output collector + */ + template + void write(const K& key, const V& value) { + uplink_->sendCommand(WRITE_KEYVALUE, key, value); + // Verify response command - bool response = protocol_->verifyResult(REOPEN_INPUT); + bool response = protocol_->verifyResult(WRITE_KEYVALUE); if (response == false) { - throw Error("reopenInput received wrong response!"); + throw HadoopUtils::Error("HamaPipes::BSPContextImpl::write received wrong response!"); } } + /** + * Write key OR value to the output collector + */ + template + void write(const T& key_or_value) { + uplink_->sendCommand(WRITE_KEYVALUE, key_or_value); + + // Verify response command + bool response = protocol_->verifyResult(WRITE_KEYVALUE); + if (response == false) { + throw HadoopUtils::Error("HamaPipes::BSPContextImpl::write received wrong response!"); + } + } /********************************************/ + /******* Messenger IMPL *******/ + /********************************************/ + /** + * Send a data with a tag to another BSPSlave corresponding to hostname. + * Messages sent by this method are not guaranteed to be received in a sent + * order. + */ + template + void sendMessage(const string& peer_name, const T& msg) { + uplink_->sendCommand(SEND_MSG, peer_name, msg); + + // Verify response command + bool response = protocol_->verifyResult(SEND_MSG); + if (response == false) { + throw HadoopUtils::Error("HamaPipes::BSPContextImpl::sendMessage received wrong response!"); + } + } + + /** + * Returns a message from the peer's received messages queue (a FIFO). + */ + template + T getCurrentMessage() { + uplink_->sendCommand(GET_MSG); + + M message = protocol_->template getResult(GET_MSG); + + if(logging) { + fprintf(stderr,"HamaPipes::BSPContextImpl::getMessage - result: %s\n", + toString(message).c_str()); + } + return message; + } + + /********************************************/ /******* SequenceFileConnector IMPL *******/ /********************************************/ /** * Open SequenceFile with opion "r" or "w" - * @return the corresponding fileID + * + * Returns the corresponding fileID */ virtual int32_t sequenceFileOpen(const string& path, const string& option, const string& key_type, const string& value_type) { @@ -1056,15 +1079,13 @@ * Read next key/value pair from the SequenceFile with fileID * Using Curiously recurring template pattern(CTRP) */ - template + template bool sequenceFileReadNext(int32_t file_id, K& key, V& value) { - // send request uplink_->sendCommand(SEQFILE_READNEXT, file_id); // get response - KeyValuePair key_value_pair; - key_value_pair = protocol_->template getKeyValueResult(SEQFILE_READNEXT); + KeyValuePair key_value_pair = protocol_->template getKeyValueResult(SEQFILE_READNEXT); if (!key_value_pair.is_empty) { key = key_value_pair.first; @@ -1072,20 +1093,40 @@ } if (logging && key_value_pair.is_empty) { - fprintf(stderr,"HamaPipes::BSPContextImpl::readNext - END_OF_DATA\n"); + fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileReadNext - END_OF_DATA\n"); } return (!key_value_pair.is_empty); } /** + * Read next key/value pair from the SequenceFile with fileID + * key OR value type is NullWritable + */ + template + bool sequenceFileReadNext(int32_t file_id, T& key_or_value) { + uplink_->sendCommand(SEQFILE_READNEXT, file_id); + + KeyValuePair key_value_pair = protocol_->template getKeyValueResult(SEQFILE_READNEXT); + + if (!key_value_pair.is_empty) { + key_or_value = key_value_pair.first; + } + + if (logging && key_value_pair.is_empty) { + fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileReadNext - END_OF_DATA\n"); + } + + return (!key_value_pair.is_empty); + } + + /** * Append the next key/value pair to the SequenceFile with fileID * Using Curiously recurring template pattern(CTRP) */ - template + template bool sequenceFileAppend(int32_t file_id, const K& key, const V& value) { - string values[] = {key, value}; - uplink_->sendCommand(SEQFILE_APPEND, file_id, values, 2); + uplink_->sendCommand(SEQFILE_APPEND, file_id, key, value); int result = protocol_->template getResult(SEQFILE_APPEND); @@ -1098,11 +1139,30 @@ return (result==1); } + /** + * Append the next key/value pair to the SequenceFile with fileID + * key OR value type is NullWritable + */ + template + bool sequenceFileAppend(int32_t file_id, const T& key_or_value) { + uplink_->sendCommand(SEQFILE_APPEND, file_id, key_or_value); + + int result = protocol_->template getResult(SEQFILE_APPEND); + + if (logging && result==0) { + fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileAppend - Nothing appended!\n"); + } else if (logging) { + fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileAppend - Successfully appended!\n"); + } + + return (result==1); + } + /********************************************/ /*************** Other STUFF ***************/ /********************************************/ - void setProtocol(Protocol< BinaryProtocol >* protocol, UpwardProtocol* uplink) { + void setProtocol(Protocol< BinaryProtocol, K1, V1>, K1, V1 >* protocol, UpwardProtocol* uplink) { protocol_ = protocol; uplink_ = uplink; } @@ -1114,20 +1174,6 @@ return done_copy; } - /** - * Advance to the next value. - */ - /* - bool nextValue() { - if (isNewKey || done) { - return false; - } - isNewValue = false; - //progress(); - protocol->nextEvent(); - return isNewValue; - } - */ void waitForTask() { while (!done_ && !has_task_) { if(logging) { @@ -1137,45 +1183,11 @@ protocol_->nextEvent(); } } - /* - bool nextKey() { - if (reader == NULL) { - while (!isNewKey) { - nextValue(); - if (done) { - return false; - } - } - key = *newKey; - } else { - if (!reader->next(key, const_cast(*value))) { - pthread_mutex_lock(&mutexDone); - done = true; - pthread_mutex_unlock(&mutexDone); - return false; - } - //progressFloat = reader->getProgress(); - } - isNewKey = false; - - if (bsp != NULL) { - bsp->bsp(*this); - } - return true; - } - */ + void closeAll() { - if (reader_) { - reader_->close(); - } - if (bsp_) { bsp_->close(); } - - if (writer_) { - writer_->close(); - } } virtual ~BSPContextImpl() { @@ -1183,8 +1195,6 @@ delete job_; delete bsp_; delete partitioner_; - delete reader_; - delete writer_; delete protocol_; delete uplink_; //delete inputSplit_; @@ -1195,7 +1205,7 @@ /** * Ping the parent every 5 seconds to know if it is alive */ -template +template void* ping(void* ptr) { BSPContextImpl* context = (BSPContextImpl*) ptr; char* portStr = getenv("hama.pipes.command.port"); @@ -1211,7 +1221,7 @@ string("problem creating socket: ") + strerror(errno)); sockaddr_in addr; addr.sin_family = AF_INET; - addr.sin_port = htons(toInt(portStr)); + addr.sin_port = htons(HadoopUtils::toInt(portStr)); addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); if(logging) { fprintf(stderr,"HamaPipes::ping - connected to GroomServer Port: %s\n", @@ -1229,7 +1239,7 @@ HADOOP_ASSERT(result == 0, "problem closing socket"); } remaining_retries = MAX_RETRIES; - } catch (Error& err) { + } catch (HadoopUtils::Error& err) { if (!context->isDone()) { fprintf(stderr, "Hama Pipes Exception: in ping %s\n", err.getMessage().c_str()); @@ -1249,21 +1259,22 @@ * Run the assigned task in the framework. * The user's main function should set the various functions using the * set* functions above and then call this. - * @return true, if the task succeeded. + * + * Returns true, if the task succeeded. */ -template +template bool runTask(const Factory& factory) { try { HADOOP_ASSERT(getenv("hama.pipes.logging")!=NULL, "No environment found!"); - logging = (toInt(getenv("hama.pipes.logging"))==0)?false:true; + logging = (HadoopUtils::toInt(getenv("hama.pipes.logging"))==0)?false:true; if (logging) { fprintf(stderr,"HamaPipes::runTask - logging is: %s\n", ((logging)?"true":"false")); } BSPContextImpl* context = new BSPContextImpl(factory); - Protocol< BinaryProtocol >* protocol; + Protocol< BinaryProtocol, K1, V1>, K1, V1 >* protocol; char* port_str = getenv("hama.pipes.command.port"); int sock = -1; @@ -1277,7 +1288,7 @@ string("problem creating socket: ") + strerror(errno)); sockaddr_in addr; addr.sin_family = AF_INET; - addr.sin_port = htons(toInt(port_str)); + addr.sin_port = htons(HadoopUtils::toInt(port_str)); addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0, string("problem connecting command socket: ") + @@ -1298,7 +1309,7 @@ HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for out_stream: ") + strerror(errno)); - protocol = new BinaryProtocol(in_stream, context, out_stream); + protocol = new BinaryProtocol, K1, V1>(in_stream, context, out_stream); if(logging) { fprintf(stderr,"HamaPipes::runTask - connected to GroomServer Port: %s\n", port_str); @@ -1310,7 +1321,7 @@ out_filename += ".out"; in_stream = fopen(filename, "r"); out_stream = fopen(out_filename.c_str(), "w"); - protocol = new BinaryProtocol(in_stream, context, out_stream); + protocol = new BinaryProtocol, K1, V1>(in_stream, context, out_stream); } else { //protocol = new TextProtocol(stdin, context, stdout); fprintf(stderr,"HamaPipes::runTask - Protocol couldn't be initialized!\n"); @@ -1357,7 +1368,7 @@ return true; - } catch (Error& err) { + } catch (HadoopUtils::Error& err) { fprintf(stderr, "Hama Pipes Exception: %s\n", err.getMessage().c_str()); return false; Index: c++/src/main/native/pipes/api/hama/Pipes.hh =================================================================== --- c++/src/main/native/pipes/api/hama/Pipes.hh (revision 1556897) +++ c++/src/main/native/pipes/api/hama/Pipes.hh (working copy) @@ -44,8 +44,6 @@ using std::vector; using std::pair; -using namespace HadoopUtils; - namespace HamaPipes { // global varibales @@ -91,6 +89,9 @@ stringify( LOG ), stringify( END_OF_DATA ) }; + /********************************************/ + /*************** KeyValuePair ***************/ + /********************************************/ /** * Generic KeyValuePair including is_empty */ @@ -103,10 +104,10 @@ explicit KeyValuePair(bool x) : is_empty(x) {} KeyValuePair(const K& k, const V& v) : base_t(k, v), is_empty(false) {} - template + template KeyValuePair(const pair &p) : base_t(p), is_empty(false) {} - template + template KeyValuePair(const KeyValuePair &p) : base_t(p), is_empty(p.is_empty) {} }; @@ -159,24 +160,24 @@ virtual const string& get(const string& key) const { map::const_iterator itr = values_.find(key); if (itr == values_.end()) { - throw Error("Key " + key + " not found in BSPJob"); + throw HadoopUtils::Error("Key " + key + " not found in BSPJob"); } return itr->second; } virtual int getInt(const string& key) const { const string& val = get(key); - return toInt(val); + return HadoopUtils::toInt(val); } virtual float getFloat(const string& key) const { const string& val = get(key); - return toFloat(val); + return HadoopUtils::toFloat(val); } virtual bool getBoolean(const string& key) const { const string& val = get(key); - return toBool(val); + return HadoopUtils::toBool(val); } }; @@ -250,21 +251,53 @@ }; /********************************************/ + /******** DownwardProtocolPartition *********/ + /********************************************/ + /* DownwardProtocolPartition wraps void template parameter */ + template + class DownwardProtocolPartition { + public: + void runPartition(const K& key, const V& value, int32_t num_tasks) { + static_cast(this)->template runPartition(key, value, num_tasks); + } + }; + + template + class DownwardProtocolPartition { + public: + void runPartition(const K& key, int32_t num_tasks) { + static_cast(this)->template runPartition(key, num_tasks); + } + }; + + template + class DownwardProtocolPartition { + public: + void runPartition(const V& value, int32_t num_tasks) { + static_cast(this)->template runPartition(value, num_tasks); + } + }; + + template + class DownwardProtocolPartition { + public: + /* Partition nothing */ + }; + + /********************************************/ /************* DownwardProtocol *************/ /********************************************/ - template - class DownwardProtocol { + template + class DownwardProtocol : public DownwardProtocolPartition{ public: virtual void start(int protocol_version) = 0; virtual void setBSPJob(vector values) = 0; virtual void setInputTypes(string key_type, string value_type) = 0; - virtual void runBsp(bool piped_input, bool piped_output) = 0; - virtual void runCleanup(bool piped_input, bool piped_output) = 0; - virtual void runSetup(bool piped_input, bool piped_output) = 0; + virtual void runBsp() = 0; + virtual void runCleanup() = 0; + virtual void runSetup() = 0; - virtual void runPartition(const K1& key, const V1& value, int32_t num_tasks) = 0; - virtual void close() = 0; virtual void abort() = 0; virtual ~DownwardProtocol() {} @@ -278,22 +311,27 @@ public: virtual void sendCommand(int32_t cmd) = 0; - template + template void sendCommand(int32_t cmd, T value) { static_cast(this)->template sendCommand(cmd, value); } - template + template void sendCommand(int32_t cmd, const T values[], int size) { static_cast(this)->template sendCommand(cmd, values, size); } - template + template void sendCommand(int32_t cmd, T1 value1, T2 value2) { static_cast(this)->template sendCommand(cmd, value1, value2); } - template + template + void sendCommand(int32_t cmd, T1 value1, T2 value2, T3 value3) { + static_cast(this)->template sendCommand(cmd, value1, value2, value3); + } + + template void sendCommand(int32_t cmd, T1 value, const T2 values[], int size) { static_cast(this)->template sendCommand(cmd, value, values, size); } @@ -304,32 +342,75 @@ virtual ~UpwardProtocol() {} }; + /********************************************/ + /*********** ProtocolEventHandler ***********/ + /********************************************/ + /* ProtocolEventHandler wraps void template parameter */ + template + class ProtocolEventHandler { + public: + void nextEvent() { + static_cast(this)->template nextEvent(); + } + }; + + template + class ProtocolEventHandler { + public: + void nextEvent() { + static_cast(this)->template nextEvent(); + } + }; + + template + class ProtocolEventHandler { + public: + void nextEvent() { + static_cast(this)->template nextEvent(); + } + }; + + template + class ProtocolEventHandler { + public: + void nextEvent() { + static_cast(this)->nextEvent(); + } + }; + + /********************************************/ + /*********** BinaryUpwardProtocol ***********/ + /********************************************/ /* Forward definition of BinaryUpwardProtocol to pass to UpwardProtocol */ class BinaryUpwardProtocol; /********************************************/ /***************** Protocol *****************/ /********************************************/ - template - class Protocol { + template + class Protocol: public ProtocolEventHandler { public: - template + template T getResult(int32_t expected_response_cmd) { return static_cast(this)->template getResult(expected_response_cmd); } - template + template vector getVectorResult(int32_t expected_response_cmd) { return static_cast(this)->template getVectorResult(expected_response_cmd); } - template + template KeyValuePair getKeyValueResult(int32_t expected_response_cmd) { return static_cast(this)->template getKeyValueResult(expected_response_cmd); } - virtual void nextEvent() = 0; + template + KeyValuePair getKeyValueResult(int32_t expected_response_cmd) { + return static_cast(this)->template getKeyValueResult(expected_response_cmd); + } + virtual bool verifyResult(int32_t expected_response_cmd) = 0; virtual UpwardProtocol* getUplink() = 0; virtual ~Protocol(){} @@ -362,28 +443,186 @@ * Read next key/value pair from the SequenceFile with fileID * Using Curiously recurring template pattern(CTRP) */ - template + template bool sequenceFileReadNext(int32_t file_id, K& key, V& value) { return static_cast(this)->template sequenceFileReadNext(file_id, key, value); } /** + * Read next key OR value from the SequenceFile with fileID + * key OR value type is NullWritable + */ + template + bool sequenceFileReadNext(int32_t file_id, T& key_or_value) { + return static_cast(this)->template sequenceFileReadNext(file_id, key_or_value); + } + + /** * Append the next key/value pair to the SequenceFile with fileID * Using Curiously recurring template pattern(CTRP) */ - template + template bool sequenceFileAppend(int32_t file_id, const K& key, const V& value) { return static_cast(this)->template sequenceFileAppend(file_id, key, value); } + + /** + * Append the next key OR value pair to the SequenceFile with fileID + * key OR value type is NullWritable + */ + template + bool sequenceFileAppend(int32_t file_id, const T& key_or_value) { + return static_cast(this)->template sequenceFileAppend(file_id, key_or_value); + } }; + /********************************************/ + /****************** Reader ******************/ + /********************************************/ + /* Reader wraps void template parameter */ + template + class Reader { + public: + /** + * Deserializes the next input key value into the given objects + */ + bool readNext(K& key, V& value) { + return static_cast(this)->template readNext(key, value); + } + + /** + * Reads the next key value pair and returns it as a pair. It may reuse a + * {@link KeyValuePair} instance to save garbage collection time. + * + * @return null if there are no records left. + * @throws IOException + */ + //public KeyValuePair readNext() throws IOException; + }; + + template + class Reader { + public: + /** + * Deserializes the next input key into the given object + * value type is NullWritable + */ + bool readNext(K& key) { + return static_cast(this)->template readNext(key); + } + }; + + template + class Reader { + public: + /** + * Deserializes the next input value into the given object + * key type is NullWritable + */ + bool readNext(V& value) { + return static_cast(this)->template readNext(value); + } + }; + + template + class Reader { + public: + /* key AND value type are NullWritable */ + /* Read nothing */ + }; + + /********************************************/ + /****************** Writer ******************/ + /********************************************/ + /* Writer wraps void template parameter */ + template + class Writer { + public: + /** + * Writes a key/value pair to the output collector + */ + void write(const K& key, const V& value) { + static_cast(this)->template write(key, value); + } + }; + + template + class Writer { + public: + /** + * Writes a key to the output collector + * value type is NullWritable + */ + void write(const K& key) { + static_cast(this)->template write(key); + } + }; + + template + class Writer { + public: + /** + * Writes a value to the output collector + * key type is NullWritable + */ + void write(const V& value) { + static_cast(this)->template write(value); + } + }; + + template + class Writer { + public: + /* key AND value type are NullWritable */ + /* Write nothing */ + }; + + /********************************************/ + /**************** Messenger *****************/ + /********************************************/ + /* Messenger wraps void template parameter */ + template + class Messenger { + public: + /** + * Send a data with a tag to another BSPSlave corresponding to hostname. + * Messages sent by this method are not guaranteed to be received in a sent + * order. + */ + void sendMessage(const string& peer_name, const M& msg) { + static_cast(this)->template sendMessage(peer_name, msg); + } + + /** + * @return A message from the peer's received messages queue (a FIFO). + */ + virtual M getCurrentMessage() { + return static_cast(this)->template getCurrentMessage(); + } + }; + + template + class Messenger { + public: + /* message type is NullWritable */ + /* therefore messenger is not available */ + }; + + /********************************************/ + /************** BSPContextImpl **************/ + /********************************************/ /* Forward definition of BSPContextImpl to pass to SequenceFileConnector */ - template + template class BSPContextImpl; - - template - class BSPContext: public TaskContext, public SequenceFileConnector > { + /********************************************/ + /***************** BSPContext ***************/ + /********************************************/ + template + class BSPContext: public TaskContext, public SequenceFileConnector >, + public Reader, K1, V1>, + public Writer, K2, V2>, + public Messenger, M> { public: /** @@ -402,18 +641,6 @@ virtual string getInputValueClass() = 0; /** - * Send a data with a tag to another BSPSlave corresponding to hostname. - * Messages sent by this method are not guaranteed to be received in a sent - * order. - */ - virtual void sendMessage(const string& peer_name, const M& msg) = 0; - - /** - * @return A message from the peer's received messages queue (a FIFO). - */ - virtual M getCurrentMessage() = 0; - - /** * @return The number of messages in the peer's received messages queue. */ virtual int getNumCurrentMessages() = 0; @@ -463,49 +690,37 @@ virtual void clear() = 0; /** - * Writes a key/value pair to the output collector - */ - virtual void write(const K2& key, const V2& value) = 0; - - /** - * Deserializes the next input key value into the given objects; - */ - virtual bool readNext(K1& key, V1& value) = 0; - - /** - * Reads the next key value pair and returns it as a pair. It may reuse a - * {@link KeyValuePair} instance to save garbage collection time. - * - * @return null if there are no records left. - * @throws IOException - */ - //public KeyValuePair readNext() throws IOException; - - /** * Closes the input and opens it right away, so that the file pointer is at * the beginning again. */ virtual void reopenInput() = 0; - }; + /********************************************/ + /****************** Closable ****************/ + /********************************************/ class Closable { public: virtual void close() {} virtual ~Closable() {} }; + /********************************************/ + /******************* BSP ********************/ + /********************************************/ /** * The application's BSP class to do bsp. */ - template + template class BSP: public Closable { public: /** * This method is called before the BSP method. It can be used for setup * purposes. */ - virtual void setup(BSPContext& context) = 0; + virtual void setup(BSPContext& context) { + // empty implementation, because overriding is optional + } /** * This method is your computation method, the main work of your BSP should be @@ -518,49 +733,51 @@ * purposes. Cleanup is guranteed to be called after the BSP runs, even in * case of exceptions. */ - virtual void cleanup(BSPContext& context) = 0; + virtual void cleanup(BSPContext& context) { + // empty implementation, because overriding is optional + } }; + /********************************************/ + /**************** Partitioner ***************/ + /********************************************/ /** * User code to decide where each key should be sent. */ - template + template class Partitioner { public: - virtual int partition(const K1& key, const V1& value, int32_t num_tasks) = 0; virtual ~Partitioner() {} }; - /** - * For applications that want to read the input directly for the map function - * they can define RecordReaders in C++. - */ - template - class RecordReader: public Closable { + template + class Partitioner { public: - virtual bool next(K& key, V& value) = 0; - - /** - * The progress of the record reader through the split as a value between - * 0.0 and 1.0. - */ - virtual float getProgress() = 0; + virtual int partition(const K1& key, int32_t num_tasks) = 0; + virtual ~Partitioner() {} }; - /** - * An object to write key/value pairs as they are emited from the reduce. - */ - template - class RecordWriter: public Closable { + template + class Partitioner { public: - virtual void emit(const K& key, const V& value) = 0; + virtual int partition(const V1& value, int32_t num_tasks) = 0; + virtual ~Partitioner() {} }; + template<> + class Partitioner { + public: + /* Partition nothing */ + }; + + /********************************************/ + /****************** Factory *****************/ + /********************************************/ /** * A factory to create the necessary application objects. */ - template + template class Factory { public: virtual BSP* createBSP(BSPContext& context) const = 0; @@ -570,35 +787,20 @@ * @return the new partitioner or NULL, if the default partitioner should be * used. */ - virtual Partitioner* createPartitioner(BSPContext& context) const { + virtual Partitioner* createPartitioner(BSPContext& context) const { return NULL; } - /** - * Create an application record reader. - * @return the new RecordReader or NULL, if the Java RecordReader should be - * used. - */ - virtual RecordReader* createRecordReader(BSPContext& context) const { - return NULL; - } - - /** - * Create an application record writer. - * @return the new RecordWriter or NULL, if the Java RecordWriter should be - * used. - */ - virtual RecordWriter* createRecordWriter(BSPContext& context) const { - return NULL; - } - virtual ~Factory() {} }; + /********************************************/ + /***************** toString *****************/ + /********************************************/ /** * Generic toString */ - template + template string toString(const T& t) { std::ostringstream oss; @@ -612,94 +814,100 @@ return t; } + /********************************************/ + /*************** Serialization **************/ + /********************************************/ /** * Generic serialization */ - template - void serialize(T t, OutStream& stream) { - serializeString(toString(t), stream); + template + void serialize(T t, HadoopUtils::OutStream& stream) { + HadoopUtils::serializeString(toString(t), stream); } /** * Generic serialization template specializations */ - template <> void serialize(int32_t t, OutStream& stream) { - serializeInt(t, stream); + template <> void serialize(int32_t t, HadoopUtils::OutStream& stream) { + HadoopUtils::serializeInt(t, stream); if (logging) { fprintf(stderr,"HamaPipes::BinaryProtocol::serializeInt '%d'\n", t); } } - template <> void serialize(int64_t t, OutStream& stream) { - serializeLong(t, stream); + template <> void serialize(int64_t t, HadoopUtils::OutStream& stream) { + HadoopUtils::serializeLong(t, stream); if (logging) { fprintf(stderr,"HamaPipes::BinaryProtocol::serializeLong '%ld'\n", (long)t); } } - template <> void serialize(float t, OutStream& stream) { - serializeFloat(t, stream); + template <> void serialize(float t, HadoopUtils::OutStream& stream) { + HadoopUtils::serializeFloat(t, stream); if (logging) { fprintf(stderr,"HamaPipes::BinaryProtocol::serializeFloat '%f'\n", t); } } - template <> void serialize(double t, OutStream& stream) { - serializeDouble(t, stream); + template <> void serialize(double t, HadoopUtils::OutStream& stream) { + HadoopUtils::serializeDouble(t, stream); if (logging) { fprintf(stderr,"HamaPipes::BinaryProtocol::serializeDouble '%f'\n", t); } } - template <> void serialize(string t, OutStream& stream) { - serializeString(t, stream); + template <> void serialize(string t, HadoopUtils::OutStream& stream) { + HadoopUtils::serializeString(t, stream); if (logging) { fprintf(stderr,"HamaPipes::BinaryProtocol::serializeString '%s'\n", t.c_str()); } } + /********************************************/ + /************** Deserialization *************/ + /********************************************/ /** * Generic deserialization */ - template - T deserialize(InStream& stream) { + template + T deserialize(HadoopUtils::InStream& stream) { string str = "Not able to deserialize type: "; - throw Error(str.append(typeid(T).name())); + throw HadoopUtils::Error(str.append(typeid(T).name())); } /** * Generic deserialization template specializations */ - template <> int32_t deserialize(InStream& stream) { - int32_t result = deserializeInt(stream); + template <> int32_t deserialize(HadoopUtils::InStream& stream) { + int32_t result = HadoopUtils::deserializeInt(stream); if (logging) { fprintf(stderr,"HamaPipes::BinaryProtocol::deserializeInt result: '%d'\n", result); } return result; } - template <> int64_t deserialize(InStream& stream) { - int64_t result = deserializeLong(stream); + template <> int64_t deserialize(HadoopUtils::InStream& stream) { + int64_t result = HadoopUtils::deserializeLong(stream); if (logging) { fprintf(stderr,"HamaPipes::BinaryProtocol::deserializeLong result: '%ld'\n", (long)result); } return result; } - template <> float deserialize(InStream& stream) { - float result = deserializeFloat(stream); + template <> float deserialize(HadoopUtils::InStream& stream) { + float result = HadoopUtils::deserializeFloat(stream); if (logging) { fprintf(stderr,"HamaPipes::BinaryProtocol::deserializeFloat result: '%f'\n", result); } return result; } - template <> double deserialize(InStream& stream) { - double result = deserializeDouble(stream); + template <> double deserialize(HadoopUtils::InStream& stream) { + double result = HadoopUtils::deserializeDouble(stream); if (logging) { fprintf(stderr,"HamaPipes::BinaryProtocol::deserializeDouble result: '%f'\n", result); } return result; } - template <> string deserialize(InStream& stream) { - string result = deserializeString(stream); + template <> string deserialize(HadoopUtils::InStream& stream) { + string result = HadoopUtils::deserializeString(stream); if (logging) { if (result.empty()) { @@ -712,17 +920,20 @@ return result; } + /********************************************/ + /*********** runTask entry method ***********/ + /********************************************/ /** * Run the assigned task in the framework. * The user's main function should set the various functions using the * set* functions above and then call this. * @return true, if the task succeeded. */ - template + template bool runTask(const Factory& factory); // Include implementation in header because of templates - #include "../../impl/Pipes.cc" +#include "../../impl/Pipes.cc" } #endif Index: core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java (revision 1556897) +++ core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java (working copy) @@ -24,8 +24,10 @@ import java.io.InputStream; import java.util.AbstractMap; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,7 +37,6 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -55,12 +56,14 @@ private BinaryProtocol binProtocol; private BSPPeer peer = null; private Configuration conf; + private FileSystem fs; protected DataInputStream inStream; protected DataOutputStream outStream; private Map>> sequenceFileReaders; private Map>> sequenceFileWriters; + private Set sequenceFileWriterPaths; public UplinkReader( BinaryProtocol binaryProtocol, @@ -68,6 +71,7 @@ this.binProtocol = binaryProtocol; this.conf = conf; + this.fs = FileSystem.get(conf); this.inStream = new DataInputStream(new BufferedInputStream(stream, BinaryProtocol.BUFFER_SIZE)); @@ -76,6 +80,7 @@ this.sequenceFileReaders = new HashMap>>(); this.sequenceFileWriters = new HashMap>>(); + this.sequenceFileWriterPaths = new HashSet(); } public UplinkReader( @@ -99,10 +104,9 @@ } int cmd = readCommand(); - if (cmd == -1) { - continue; - } - LOG.debug("Handling uplink command: " + MessageType.values()[cmd]); + LOG.debug("Handling uplink command: " + cmd); + // MessageType.values()[cmd] may cause NullPointerException (bad + // command) if (cmd == MessageType.WRITE_KEYVALUE.code && isPeerAvailable()) { // INCOMING writeKeyValue(); @@ -165,10 +169,11 @@ } else if (cmd == MessageType.PARTITION_RESPONSE.code) { // INCOMING partitionResponse(); } else { - throw new IOException("Bad command code: " + cmd); + throw new Exception("Bad command code: " + cmd); } } catch (InterruptedException e) { + onError(e); return; } catch (Throwable e) { onError(e); @@ -180,6 +185,13 @@ // onError is overwritten by StreamingProtocol in Hama Streaming protected void onError(Throwable e) { LOG.error(StringUtils.stringifyException(e)); + + // notify binaryProtocol and set Exception + synchronized (binProtocol.hasTaskLock) { + binProtocol.setUplinkException(e); + binProtocol.setHasTask(false); + binProtocol.hasTaskLock.notify(); + } } // readCommand is overwritten by StreamingProtocol in Hama Streaming @@ -188,7 +200,20 @@ } public void closeConnection() throws IOException { + // close input stream this.inStream.close(); + + // close open SequenceFileReaders + for (int fileID : this.sequenceFileReaders.keySet()) { + LOG.debug("close SequenceFileReader: " + fileID); + this.sequenceFileReaders.get(fileID).getKey().close(); + } + + // close open SequenceFileWriters + for (int fileID : this.sequenceFileWriters.keySet()) { + LOG.debug("close SequenceFileWriter: " + fileID); + this.sequenceFileWriters.get(fileID).getKey().close(); + } } public void reopenInput() throws IOException { @@ -261,14 +286,15 @@ public void getAllPeerNames() throws IOException { LOG.debug("Got MessageType.GET_ALL_PEERNAME"); + String[] peerNames = peer.getAllPeerNames(); WritableUtils.writeVInt(this.outStream, MessageType.GET_ALL_PEERNAME.code); - WritableUtils.writeVInt(this.outStream, peer.getAllPeerNames().length); - for (String s : peer.getAllPeerNames()) { + WritableUtils.writeVInt(this.outStream, peerNames.length); + for (String s : peerNames) { Text.writeString(this.outStream, s); } binProtocol.flush(); LOG.debug("Responded MessageType.GET_ALL_PEERNAME - peerNamesCount: " - + peer.getAllPeerNames().length); + + peerNames.length); } public void sync() throws IOException, SyncException, InterruptedException { @@ -283,15 +309,18 @@ public void getMessage() throws IOException { LOG.debug("Got MessageType.GET_MSG"); - WritableUtils.writeVInt(this.outStream, MessageType.GET_MSG.code); Writable message = peer.getCurrentMessage(); if (message != null) { + WritableUtils.writeVInt(this.outStream, MessageType.GET_MSG.code); binProtocol.writeObject(message); + LOG.debug("Responded MessageType.GET_MSG - Message: " + + ((message.toString().length() < 10) ? message.toString() : message + .toString().substring(0, 9) + "...")); + } else { + WritableUtils.writeVInt(this.outStream, MessageType.END_OF_DATA.code); + LOG.debug("Responded MessageType.END_OF_DATA"); } binProtocol.flush(); - LOG.debug("Responded MessageType.GET_MSG - Message: " - + ((message.toString().length() < 10) ? message.toString() : message - .toString().substring(0, 9) + "...")); } public void getMessageCount() throws IOException { @@ -408,7 +437,7 @@ WritableUtils.writeVInt(this.outStream, MessageType.WRITE_KEYVALUE.code); binProtocol.flush(); LOG.debug("Responded MessageType.WRITE_KEYVALUE"); - + LOG.debug("Done MessageType.WRITE_KEYVALUE -" + " Key: " + ((keyOut.toString().length() < 10) ? keyOut.toString() : keyOut @@ -432,32 +461,44 @@ int fileID = -1; - FileSystem fs = FileSystem.get(conf); if (option.equals("r")) { SequenceFile.Reader reader; try { reader = new SequenceFile.Reader(fs, new Path(path), conf); - // try to load key and value class - Class sequenceKeyClass = conf.getClassLoader().loadClass(keyClass); - Class sequenceValueClass = conf.getClassLoader().loadClass( - valueClass); + if (reader.getKeyClassName().equals(keyClass) + && reader.getValueClassName().equals(valueClass)) { + // try to load key and value class + Class sequenceKeyClass = conf.getClassLoader().loadClass(keyClass); + Class sequenceValueClass = conf.getClassLoader().loadClass( + valueClass); - // try to instantiate key and value class - Writable sequenceKeyWritable = (Writable) ReflectionUtils.newInstance( - sequenceKeyClass, conf); - Writable sequenceValueWritable = (Writable) ReflectionUtils - .newInstance(sequenceValueClass, conf); + // try to instantiate key and value class + Writable sequenceKeyWritable = (Writable) ReflectionUtils + .newInstance(sequenceKeyClass, conf); + Writable sequenceValueWritable = (Writable) ReflectionUtils + .newInstance(sequenceValueClass, conf); - // put new fileID and key and value Writable instances into HashMap - fileID = reader.hashCode(); - sequenceFileReaders - .put( - fileID, - new AbstractMap.SimpleEntry>( - reader, new AbstractMap.SimpleEntry( - sequenceKeyWritable, sequenceValueWritable))); + // put new fileID and key and value Writable instances into HashMap + fileID = reader.hashCode(); + this.sequenceFileReaders + .put( + fileID, + new AbstractMap.SimpleEntry>( + reader, new AbstractMap.SimpleEntry( + sequenceKeyWritable, sequenceValueWritable))); + } else { // keyClass or valueClass is wrong + fileID = -1; + if (!reader.getKeyClassName().equals(keyClass)) { + LOG.error("SEQFILE_OPEN - Wrong KeyClass: " + keyClass + + " File KeyClass: " + reader.getKeyClassName()); + } else { + LOG.error("SEQFILE_OPEN - Wrong ValueClass: " + valueClass + + " File ValueClass: " + reader.getValueClassName()); + } + } + } catch (IOException e) { LOG.error("SEQFILE_OPEN - " + e.getMessage()); fileID = -1; @@ -469,30 +510,43 @@ } else if (option.equals("w")) { SequenceFile.Writer writer; try { + // SequenceFile.Writer has an exclusive lease for a file + // No other client can write to this file until other Writer has + // completed + if (!this.sequenceFileWriterPaths.contains(path)) { - // try to load key and value class - Class sequenceKeyClass = conf.getClassLoader().loadClass(keyClass); - Class sequenceValueClass = conf.getClassLoader().loadClass( - valueClass); + // try to load key and value class + Class sequenceKeyClass = conf.getClassLoader().loadClass(keyClass); + Class sequenceValueClass = conf.getClassLoader().loadClass( + valueClass); - writer = new SequenceFile.Writer(fs, conf, new Path(path), - sequenceKeyClass, sequenceValueClass); + // try to instantiate key and value class + Writable sequenceKeyWritable = (Writable) ReflectionUtils + .newInstance(sequenceKeyClass, conf); + Writable sequenceValueWritable = (Writable) ReflectionUtils + .newInstance(sequenceValueClass, conf); - // try to instantiate key and value class - Writable sequenceKeyWritable = (Writable) ReflectionUtils.newInstance( - sequenceKeyClass, conf); - Writable sequenceValueWritable = (Writable) ReflectionUtils - .newInstance(sequenceValueClass, conf); + writer = new SequenceFile.Writer(fs, conf, new Path(path), + sequenceKeyClass, sequenceValueClass); - // put new fileID and key and value Writable instances into HashMap - fileID = writer.hashCode(); - sequenceFileWriters - .put( - fileID, - new AbstractMap.SimpleEntry>( - writer, new AbstractMap.SimpleEntry( - sequenceKeyWritable, sequenceValueWritable))); + // put new fileID and key and value Writable instances into HashMap + fileID = writer.hashCode(); + this.sequenceFileWriters + .put( + fileID, + new AbstractMap.SimpleEntry>( + writer, new AbstractMap.SimpleEntry( + sequenceKeyWritable, sequenceValueWritable))); + // add path to set (exclusive access) + this.sequenceFileWriterPaths.add(path); + + } else { // Path was already opened by another SequenceFile.Writer + fileID = -1; + LOG.error("SEQFILE_OPEN - Path: " + path + + " is already used by another Writer!"); + } + } catch (IOException e) { LOG.error("SEQFILE_OPEN - " + e.getMessage()); fileID = -1; @@ -515,7 +569,7 @@ LOG.debug("GOT MessageType.SEQFILE_READNEXT - FileID: " + fileID); // check if fileID is available in sequenceFileReader - if (sequenceFileReaders.containsKey(fileID)) { + if (this.sequenceFileReaders.containsKey(fileID)) { Writable sequenceKeyWritable = sequenceFileReaders.get(fileID).getValue() .getKey(); @@ -523,7 +577,7 @@ .getValue().getValue(); // try to read next key/value pair from SequenceFile.Reader - if (sequenceFileReaders.get(fileID).getKey() + if (this.sequenceFileReaders.get(fileID).getKey() .next(sequenceKeyWritable, sequenceValueWritable)) { WritableUtils.writeVInt(this.outStream, @@ -542,14 +596,14 @@ + "...")); } else { // false when at end of file - WritableUtils.writeVInt(this.outStream, MessageType.END_OF_DATA.code); LOG.debug("Responded MessageType.SEQFILE_READNEXT - END_OF_DATA"); } binProtocol.flush(); } else { // no fileID stored - LOG.warn("SequenceFileReader: FileID " + fileID + " not found!"); + LOG.error("MessageType.SEQFILE_READNEXT: FileID " + fileID + + " not found!"); WritableUtils.writeVInt(this.outStream, MessageType.END_OF_DATA.code); LOG.debug("Responded MessageType.SEQFILE_READNEXT - END_OF_DATA"); binProtocol.flush(); @@ -563,7 +617,7 @@ boolean result = false; // check if fileID is available in sequenceFileWriter - if (sequenceFileWriters.containsKey(fileID)) { + if (this.sequenceFileWriters.containsKey(fileID)) { Writable sequenceKeyWritable = sequenceFileWriters.get(fileID).getValue() .getKey(); @@ -577,7 +631,7 @@ if ((sequenceKeyWritable != null) && (sequenceValueWritable != null)) { // append to sequenceFile - sequenceFileWriters.get(fileID).getKey() + this.sequenceFileWriters.get(fileID).getKey() .append(sequenceKeyWritable, sequenceValueWritable); LOG.debug("Stored data: Key: " @@ -591,6 +645,13 @@ result = true; } + } else { // no fileID stored + + // Skip written data from InputStream + int availableBytes = this.inStream.available(); + this.inStream.skip(availableBytes); + LOG.debug("MessageType.SEQFILE_APPEND: skip " + availableBytes + " bytes"); + LOG.error("MessageType.SEQFILE_APPEND: FileID " + fileID + " not found!"); } // RESPOND @@ -606,12 +667,16 @@ boolean result = false; - if (sequenceFileReaders.containsKey(fileID)) { - sequenceFileReaders.get(fileID).getKey().close(); + if (this.sequenceFileReaders.containsKey(fileID)) { + this.sequenceFileReaders.get(fileID).getKey().close(); + this.sequenceFileReaders.remove(fileID); result = true; - } else if (sequenceFileWriters.containsKey(fileID)) { - sequenceFileWriters.get(fileID).getKey().close(); + } else if (this.sequenceFileWriters.containsKey(fileID)) { + this.sequenceFileWriters.get(fileID).getKey().close(); + this.sequenceFileWriters.remove(fileID); result = true; + } else { // no fileID stored + LOG.error("MessageType.SEQFILE_CLOSE: FileID " + fileID + " not found!"); } // RESPOND @@ -663,9 +728,6 @@ } else if (obj instanceof LongWritable) { ((LongWritable) obj).set(WritableUtils.readVLong(this.inStream)); - } else if (obj instanceof NullWritable) { - throw new IOException("Cannot read data into NullWritable!"); - } else { try { LOG.debug("reading type: " + obj.getClass().getName()); Index: core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java (revision 1556897) +++ core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java (working copy) @@ -37,6 +37,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.util.StringUtils; import org.apache.hama.bsp.BSPPeer; import org.apache.hama.pipes.Submitter; @@ -61,6 +62,7 @@ public final Object hasTaskLock = new Object(); private boolean hasTask = false; + private Throwable uplinkException = null; public final Object resultLock = new Object(); private Integer resultInt = null; @@ -128,6 +130,10 @@ return new UplinkReader(this, peer, in); } + public void setUplinkException(Throwable e) { + this.uplinkException = e; + } + public boolean isHasTask() { return this.hasTask; } @@ -223,36 +229,24 @@ } @Override - public void runSetup(boolean pipedInput, boolean pipedOutput) - throws IOException { - + public void runSetup() throws IOException { WritableUtils.writeVInt(this.outStream, MessageType.RUN_SETUP.code); - WritableUtils.writeVInt(this.outStream, pipedInput ? 1 : 0); - WritableUtils.writeVInt(this.outStream, pipedOutput ? 1 : 0); flush(); setHasTask(true); LOG.debug("Sent MessageType.RUN_SETUP"); } @Override - public void runBsp(boolean pipedInput, boolean pipedOutput) - throws IOException { - + public void runBsp() throws IOException { WritableUtils.writeVInt(this.outStream, MessageType.RUN_BSP.code); - WritableUtils.writeVInt(this.outStream, pipedInput ? 1 : 0); - WritableUtils.writeVInt(this.outStream, pipedOutput ? 1 : 0); flush(); setHasTask(true); LOG.debug("Sent MessageType.RUN_BSP"); } @Override - public void runCleanup(boolean pipedInput, boolean pipedOutput) - throws IOException { - + public void runCleanup() throws IOException { WritableUtils.writeVInt(this.outStream, MessageType.RUN_CLEANUP.code); - WritableUtils.writeVInt(this.outStream, pipedInput ? 1 : 0); - WritableUtils.writeVInt(this.outStream, pipedOutput ? 1 : 0); flush(); setHasTask(true); LOG.debug("Sent MessageType.RUN_CLEANUP"); @@ -279,7 +273,7 @@ synchronized (this.resultLock) { try { while (resultInt == null) { - this.resultLock.wait(); + this.resultLock.wait(); // this call blocks } resultVal = resultInt; @@ -329,17 +323,19 @@ @Override public boolean waitForFinish() throws IOException, InterruptedException { - // LOG.debug("waitForFinish... "+hasTask); + // LOG.debug("waitForFinish... " + hasTask); synchronized (this.hasTaskLock) { - try { - while (this.hasTask) - this.hasTaskLock.wait(); - } catch (InterruptedException e) { - LOG.error(e); + while (this.hasTask) { + this.hasTaskLock.wait(); // this call blocks } - } + // Check if UplinkReader thread has thrown exception + if (uplinkException != null) { + throw new InterruptedException( + StringUtils.stringifyException(uplinkException)); + } + } return hasTask; } Index: core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java (revision 1556897) +++ core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java (working copy) @@ -58,29 +58,23 @@ /** * runSetup * - * @param pipedInput use pipedInput - * @param pipedOutput use pipedOutput * @throws IOException */ - void runSetup(boolean pipedInput, boolean pipedOutput) throws IOException; + void runSetup() throws IOException; /** * runBsp * - * @param pipedInput use pipedInput - * @param pipedOutput use pipedOutput * @throws IOException */ - void runBsp(boolean pipedInput, boolean pipedOutput) throws IOException; + void runBsp() throws IOException; /** * runCleanup * - * @param pipedInput use pipedInput - * @param pipedOutput use pipedOutput * @throws IOException */ - void runCleanup(boolean pipedInput, boolean pipedOutput) throws IOException; + void runCleanup() throws IOException; /** * getPartition Index: core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java (revision 1556897) +++ core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java (working copy) @@ -244,15 +244,13 @@ } @Override - public void runSetup(boolean pipedInput, boolean pipedOutput) - throws IOException { + public void runSetup() throws IOException { writeLine(MessageType.RUN_SETUP, null); waitOnAck(); } @Override - public void runBsp(boolean pipedInput, boolean pipedOutput) - throws IOException { + public void runBsp() throws IOException { writeLine(MessageType.RUN_BSP, null); waitOnAck(); } @@ -269,8 +267,7 @@ } @Override - public void runCleanup(boolean pipedInput, boolean pipedOutput) - throws IOException { + public void runCleanup() throws IOException { writeLine(MessageType.RUN_CLEANUP, null); waitOnAck(); } Index: core/src/main/java/org/apache/hama/pipes/PipesApplication.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/PipesApplication.java (revision 1556897) +++ core/src/main/java/org/apache/hama/pipes/PipesApplication.java (working copy) @@ -53,8 +53,8 @@ * */ public class PipesApplication { - private static final Log LOG = LogFactory.getLog(PipesApplication.class); + private static final int SERVER_SOCKET_TIMEOUT = 2000; private ServerSocket serverSocket; private Process process; private Socket clientSocket; @@ -85,11 +85,9 @@ // add TMPDIR environment variable with the value of java.io.tmpdir env.put("TMPDIR", System.getProperty("java.io.tmpdir")); - /* Set Logging Environment from Configuration */ + // Set Logging Environment from Configuration env.put("hama.pipes.logging", conf.getBoolean("hama.pipes.logging", false) ? "1" : "0"); - LOG.debug("DEBUG hama.pipes.logging: " - + conf.getBoolean("hama.pipes.logging", false)); return env; } @@ -213,7 +211,7 @@ if (!streamingEnabled) { LOG.debug("DEBUG: waiting for Client at " + serverSocket.getLocalSocketAddress()); - serverSocket.setSoTimeout(2000); + serverSocket.setSoTimeout(SERVER_SOCKET_TIMEOUT); clientSocket = serverSocket.accept(); LOG.debug("DEBUG: Client connected! - start BinaryProtocol!"); @@ -234,7 +232,7 @@ br.close(); throw new SocketException( - "Timout: Client pipes application was not connecting!"); + "Timout: Client pipes application did not connect!"); } } @@ -284,7 +282,7 @@ } else { LOG.debug("DEBUG: waiting for Client at " + serverSocket.getLocalSocketAddress()); - serverSocket.setSoTimeout(2000); + serverSocket.setSoTimeout(SERVER_SOCKET_TIMEOUT); clientSocket = serverSocket.accept(); LOG.debug("DEBUG: Client connected! - start BinaryProtocol!"); @@ -305,7 +303,7 @@ br.close(); throw new SocketException( - "Timout: Client pipes application was not connecting!"); + "Timout: Client pipes application did not connect!"); } } Index: core/src/main/java/org/apache/hama/pipes/PipesBSP.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/PipesBSP.java (revision 1556897) +++ core/src/main/java/org/apache/hama/pipes/PipesBSP.java (working copy) @@ -19,8 +19,6 @@ import java.io.IOException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; import org.apache.hama.bsp.BSP; @@ -34,24 +32,27 @@ public class PipesBSP extends BSP { - private static final Log LOG = LogFactory.getLog(PipesBSP.class); private PipesApplication application = new PipesApplication(); + private boolean applicationIsAlive = true; @Override public void setup(BSPPeer peer) throws IOException, SyncException, InterruptedException { - this.application.start(peer); + try { + this.application.start(peer); - this.application.getDownlink().runSetup(false, false); + this.application.getDownlink().runSetup(); - try { this.application.waitForFinish(); + } catch (IOException e) { - LOG.error(e); + this.application.cleanup(false); throw e; + } catch (InterruptedException e) { - e.printStackTrace(); + this.application.cleanup(false); + throw e; } } @@ -59,15 +60,20 @@ public void bsp(BSPPeer peer) throws IOException, SyncException, InterruptedException { - this.application.getDownlink().runBsp(false, false); + try { + this.application.getDownlink().runBsp(); - try { this.application.waitForFinish(); + } catch (IOException e) { - LOG.error(e); + applicationIsAlive = false; + this.application.cleanup(false); throw e; + } catch (InterruptedException e) { - e.printStackTrace(); + applicationIsAlive = false; + this.application.cleanup(false); + throw e; } } @@ -83,17 +89,24 @@ public void cleanup(BSPPeer peer) throws IOException { - application.getDownlink().runCleanup(false, false); + try { + if (applicationIsAlive) { - try { - this.application.waitForFinish(); + this.application.getDownlink().runCleanup(); + + this.application.waitForFinish(); + + this.application.cleanup(true); + } } catch (IOException e) { - LOG.error(e); + applicationIsAlive = false; + this.application.cleanup(false); throw e; + } catch (InterruptedException e) { - e.printStackTrace(); - } finally { - this.application.cleanup(true); + applicationIsAlive = false; + this.application.cleanup(false); + throw new IOException(e); } } Index: core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java (revision 1556897) +++ core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java (working copy) @@ -107,6 +107,10 @@ Path path = new Path(cmdLine.getOptionValue("file")); FileSystem fs = FileSystem.get(path.toUri(), conf); + if (!fs.isFile(path)) { + System.out.println("File does not exist: " + path.toString()); + return; + } SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); Writer writer; Index: core/src/main/java/org/apache/hama/pipes/Submitter.java =================================================================== --- core/src/main/java/org/apache/hama/pipes/Submitter.java (revision 1556897) +++ core/src/main/java/org/apache/hama/pipes/Submitter.java (working copy) @@ -109,46 +109,6 @@ } /** - * Set whether the job is using a Java RecordReader. - * - * @param conf the configuration to modify - * @param value the new value - */ - public static void setIsJavaRecordReader(Configuration conf, boolean value) { - conf.setBoolean("hama.pipes.java.recordreader", value); - } - - /** - * Check whether the job is using a Java RecordReader - * - * @param conf the configuration to check - * @return is it a Java RecordReader? - */ - public static boolean getIsJavaRecordReader(Configuration conf) { - return conf.getBoolean("hama.pipes.java.recordreader", false); - } - - /** - * Set whether the job will use a Java RecordWriter. - * - * @param conf the configuration to modify - * @param value the new value to set - */ - public static void setIsJavaRecordWriter(Configuration conf, boolean value) { - conf.setBoolean("hama.pipes.java.recordwriter", value); - } - - /** - * Will the job use a Java RecordWriter? - * - * @param conf the configuration to check - * @return true, if the output of the job will be written by Java - */ - public static boolean getIsJavaRecordWriter(Configuration conf) { - return conf.getBoolean("hama.pipes.java.recordwriter", false); - } - - /** * Set the configuration, if it doesn't already have a value for the given * key. * @@ -237,8 +197,6 @@ setIfUnset(job.getConfiguration(), "bsp.job.name", "Hama Pipes Job"); // DEBUG Output - LOG.debug("isJavaRecordReader: " - + getIsJavaRecordReader(job.getConfiguration())); LOG.debug("BspClass: " + job.getBspClass().getName()); // conf.setInputFormat(NLineInputFormat.class); LOG.debug("InputFormat: " + job.getInputFormat()); @@ -440,7 +398,6 @@ } if (results.hasOption("inputformat")) { - setIsJavaRecordReader(job.getConfiguration(), true); job.setInputFormat(getClass(results, "inputformat", conf, InputFormat.class)); } @@ -451,7 +408,6 @@ } if (results.hasOption("outputformat")) { - setIsJavaRecordWriter(job.getConfiguration(), true); job.setOutputFormat(getClass(results, "outputformat", conf, OutputFormat.class)); } Index: core/src/test/java/org/apache/hama/pipes/TestPipes.java =================================================================== --- core/src/test/java/org/apache/hama/pipes/TestPipes.java (revision 1556897) +++ core/src/test/java/org/apache/hama/pipes/TestPipes.java (working copy) @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hama.Constants; @@ -44,6 +45,7 @@ import org.apache.hama.bsp.FileInputFormat; import org.apache.hama.bsp.FileOutputFormat; import org.apache.hama.bsp.KeyValueTextInputFormat; +import org.apache.hama.bsp.NullInputFormat; import org.apache.hama.bsp.SequenceFileInputFormat; import org.apache.hama.bsp.SequenceFileOutputFormat; import org.apache.hama.bsp.message.MessageManager; @@ -62,6 +64,7 @@ public static final String EXAMPLES_INSTALL_PROPERTY = "hama.pipes.examples.install"; public static final String EXAMPLE_SUMMATION_EXEC = "/examples/summation"; + public static final String EXAMPLE_PIESTIMATOR_EXEC = "/examples/piestimator"; public static final String EXAMPLE_MATRIXMULTIPLICATION_EXEC = "/examples/matrixmultiplication"; public static final String EXAMPLE_TMP_OUTPUT = "/tmp/test-example/"; public static final String HAMA_TMP_OUTPUT = "/tmp/hama-pipes/"; @@ -70,6 +73,7 @@ private HamaConfiguration configuration; private static FileSystem fs = null; + private String examplesInstallPath; public TestPipes() { configuration = new HamaConfiguration(); @@ -118,21 +122,25 @@ + " is empty! Skipping TestPipes!"); return; } + this.examplesInstallPath = System.getProperty(EXAMPLES_INSTALL_PROPERTY); // *** Summation Test *** summation(); + // *** PiEstimator Test *** + piestimation(); + // *** MatrixMultiplication Test *** matrixMult(); - + // Remove local temp folder cleanup(fs, new Path(EXAMPLE_TMP_OUTPUT)); } private void summation() throws Exception { // Setup Paths - String examplesInstallPath = System.getProperty(EXAMPLES_INSTALL_PROPERTY); - Path summationExec = new Path(examplesInstallPath + EXAMPLE_SUMMATION_EXEC); + Path summationExec = new Path(this.examplesInstallPath + + EXAMPLE_SUMMATION_EXEC); Path inputPath = new Path(EXAMPLE_TMP_OUTPUT + "summation/in"); Path outputPath = new Path(EXAMPLE_TMP_OUTPUT + "summation/out"); @@ -144,15 +152,36 @@ outputPath, 1, this.numOfGroom); // Verify output - verifySummationOutput(configuration, outputPath, sum); + verifyOutput(configuration, outputPath, sum.doubleValue(), + Math.pow(10, (DOUBLE_PRECISION * -1))); + // Clean input and output folder cleanup(fs, inputPath); cleanup(fs, outputPath); } + private void piestimation() throws Exception { + // Setup Paths + Path piestimatorExec = new Path(this.examplesInstallPath + + EXAMPLE_PIESTIMATOR_EXEC); + Path inputPath = new Path(EXAMPLE_TMP_OUTPUT + "piestimator/in"); + Path outputPath = new Path(EXAMPLE_TMP_OUTPUT + "piestimator/out"); + + // Run PiEstimator example + runProgram(getPiestimatorJob(configuration), piestimatorExec, inputPath, + outputPath, 3, this.numOfGroom); + + // Verify output + verifyOutput(configuration, outputPath, Math.PI, Math.pow(10, (2 * -1))); + + // Clean input and output folder + cleanup(fs, inputPath); + cleanup(fs, outputPath); + } + private void matrixMult() throws Exception { - String examplesInstallPath = System.getProperty(EXAMPLES_INSTALL_PROPERTY); - Path matrixmultiplicationExec = new Path(examplesInstallPath + // Setup Paths + Path matrixmultiplicationExec = new Path(this.examplesInstallPath + EXAMPLE_MATRIXMULTIPLICATION_EXEC); Path inputPath = new Path(EXAMPLE_TMP_OUTPUT + "matmult/in"); @@ -192,10 +221,19 @@ bsp.setInputKeyClass(Text.class); bsp.setInputValueClass(Text.class); bsp.setOutputFormat(SequenceFileOutputFormat.class); - bsp.setOutputKeyClass(Text.class); + bsp.setOutputKeyClass(NullWritable.class); bsp.setOutputValueClass(DoubleWritable.class); bsp.set("bsp.message.class", DoubleWritable.class.getName()); + return bsp; + } + static BSPJob getPiestimatorJob(HamaConfiguration conf) throws IOException { + BSPJob bsp = new BSPJob(conf); + bsp.setInputFormat(NullInputFormat.class); + bsp.setOutputFormat(SequenceFileOutputFormat.class); + bsp.setOutputKeyClass(NullWritable.class); + bsp.setOutputValueClass(DoubleWritable.class); + bsp.set("bsp.message.class", IntWritable.class.getName()); return bsp; } @@ -211,10 +249,10 @@ bsp.set(Constants.RUNTIME_PARTITIONING_DIR, HAMA_TMP_OUTPUT + "/parts"); bsp.set("bsp.message.class", PipesKeyValueWritable.class.getName()); - + bsp.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true); bsp.setPartitioner(PipesPartitioner.class); - + // sort sent messages bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS, "org.apache.hama.bsp.message.queue.SortedMessageTransferProtocol"); @@ -256,7 +294,6 @@ * rand.nextDouble(); matrix[i][j] = new BigDecimal(randomValue).setScale(DOUBLE_PRECISION, BigDecimal.ROUND_DOWN).doubleValue(); - // matrix[i][j] = rand.nextInt(9) + 1; } } return matrix; @@ -350,22 +387,21 @@ } } - static void verifySummationOutput(HamaConfiguration conf, Path outputPath, - BigDecimal sum) throws IOException { + static void verifyOutput(HamaConfiguration conf, Path outputPath, + double expectedResult, double delta) throws IOException { FileStatus[] listStatus = fs.listStatus(outputPath); for (FileStatus status : listStatus) { if (!status.isDir()) { SequenceFile.Reader reader = new SequenceFile.Reader(fs, status.getPath(), conf); - Text key = new Text(); + NullWritable key = NullWritable.get(); DoubleWritable value = new DoubleWritable(); if (reader.next(key, value)) { LOG.info("Output File: " + status.getPath()); LOG.info("key: '" + key + "' value: '" + value + "' expected: '" - + sum.doubleValue() + "'"); - assertEquals("Expected value: '" + sum + "' != '" + value + "'", - sum.doubleValue(), value.get(), - Math.pow(10, (DOUBLE_PRECISION * -1))); + + expectedResult + "'"); + assertEquals("Expected value: '" + expectedResult + "' != '" + value + + "'", expectedResult, value.get(), delta); } reader.close(); } @@ -417,9 +453,6 @@ FileInputFormat.setInputPaths(bsp, inputPath); FileOutputFormat.setOutputPath(bsp, outputPath); - Submitter.setIsJavaRecordReader(conf, true); - Submitter.setIsJavaRecordWriter(conf, true); - BSPJobClient jobClient = new BSPJobClient(conf); // Set bspTaskNum